2026-03-25 19:25:01 -04:00
""" Timmy ' s scheduled work — orchestration, sovereignty, heartbeat. """
2026-03-25 21:12:26 +00:00
2026-03-25 19:25:01 -04:00
import glob
2026-03-28 22:00:56 -04:00
import html
import json
2026-03-25 19:25:01 -04:00
import os
2026-03-28 22:00:56 -04:00
import re
import socket
2026-03-25 19:25:01 -04:00
import subprocess
2026-03-25 21:15:36 +00:00
import sys
2026-03-28 22:00:56 -04:00
import urllib . parse
import urllib . request
from datetime import datetime , timedelta , timezone
2026-03-25 21:15:36 +00:00
from pathlib import Path
2026-03-25 21:12:26 +00:00
from orchestration import huey
from huey import crontab
2026-03-25 19:05:29 -04:00
from gitea_client import GiteaClient
2026-03-25 21:12:26 +00:00
2026-03-25 19:25:01 -04:00
HERMES_HOME = Path . home ( ) / " .hermes "
TIMMY_HOME = Path . home ( ) / " .timmy "
2026-03-26 17:00:22 -04:00
HERMES_AGENT_DIR = HERMES_HOME / " hermes-agent "
2026-03-27 18:48:36 -04:00
HERMES_PYTHON = HERMES_AGENT_DIR / " venv " / " bin " / " python3 "
2026-03-26 17:00:22 -04:00
METRICS_DIR = TIMMY_HOME / " metrics "
2026-03-25 21:12:26 +00:00
REPOS = [
" Timmy_Foundation/the-nexus " ,
" Timmy_Foundation/timmy-config " ,
]
NET_LINE_LIMIT = 10
2026-03-28 22:00:56 -04:00
BRIEFING_DIR = TIMMY_HOME / " briefings " / " good-morning "
TELEGRAM_BOT_TOKEN_FILE = Path . home ( ) / " .config " / " telegram " / " special_bot "
TELEGRAM_CHAT_ID = " -1003664764329 "
2026-03-25 21:12:26 +00:00
2026-03-26 17:00:22 -04:00
# ── Local Model Inference via Hermes Harness ─────────────────────────
HEARTBEAT_MODEL = " hermes4:14b "
FALLBACK_MODEL = " hermes3:8b "
2026-03-27 17:35:07 -04:00
LOCAL_PROVIDER_BASE_URL = " http://localhost:8081/v1 "
LOCAL_PROVIDER_MODEL = HEARTBEAT_MODEL
2026-03-27 18:09:28 -04:00
JSON_DECODER = json . JSONDecoder ( )
2026-03-27 17:35:07 -04:00
def newest_file ( directory , pattern ) :
files = sorted ( directory . glob ( pattern ) )
return files [ - 1 ] if files else None
2026-03-26 17:00:22 -04:00
2026-03-27 18:48:36 -04:00
def run_hermes_local (
prompt ,
model = None ,
caller_tag = None ,
toolsets = None ,
system_prompt = None ,
disable_all_tools = False ,
skip_context_files = False ,
skip_memory = False ,
max_iterations = 30 ,
) :
2026-03-27 16:00:29 -04:00
""" Call a local model through the Hermes harness.
2026-03-26 17:00:22 -04:00
2026-03-27 18:48:36 -04:00
Runs Hermes inside its own venv so task execution matches the same
environment and provider routing as normal Hermes usage .
2026-03-27 18:09:28 -04:00
Returns response text plus session metadata or None on failure .
2026-03-26 17:00:22 -04:00
Every call creates a Hermes session with telemetry .
"""
_model = model or HEARTBEAT_MODEL
tagged = f " [ { caller_tag } ] { prompt } " if caller_tag else prompt
try :
2026-03-27 18:48:36 -04:00
runner = """
import io
import json
import sys
from contextlib import redirect_stderr , redirect_stdout
from pathlib import Path
agent_dir = Path ( sys . argv [ 1 ] )
query = sys . argv [ 2 ]
model = sys . argv [ 3 ]
system_prompt = sys . argv [ 4 ] or None
disable_all_tools = sys . argv [ 5 ] == " 1 "
skip_context_files = sys . argv [ 6 ] == " 1 "
skip_memory = sys . argv [ 7 ] == " 1 "
max_iterations = int ( sys . argv [ 8 ] )
if str ( agent_dir ) not in sys . path :
sys . path . insert ( 0 , str ( agent_dir ) )
from hermes_cli . runtime_provider import resolve_runtime_provider
from run_agent import AIAgent
from toolsets import get_all_toolsets
buf = io . StringIO ( )
err = io . StringIO ( )
payload = { }
exit_code = 0
try :
runtime = resolve_runtime_provider ( )
kwargs = {
" model " : model ,
" api_key " : runtime . get ( " api_key " ) ,
" base_url " : runtime . get ( " base_url " ) ,
" provider " : runtime . get ( " provider " ) ,
" api_mode " : runtime . get ( " api_mode " ) ,
" acp_command " : runtime . get ( " command " ) ,
" acp_args " : list ( runtime . get ( " args " ) or [ ] ) ,
" max_iterations " : max_iterations ,
" quiet_mode " : True ,
" ephemeral_system_prompt " : system_prompt ,
" skip_context_files " : skip_context_files ,
" skip_memory " : skip_memory ,
}
if disable_all_tools :
kwargs [ " disabled_toolsets " ] = sorted ( get_all_toolsets ( ) . keys ( ) )
agent = AIAgent ( * * kwargs )
with redirect_stdout ( buf ) , redirect_stderr ( err ) :
result = agent . run_conversation ( query , sync_honcho = False )
payload = {
" response " : result . get ( " final_response " , " " ) ,
" session_id " : getattr ( agent , " session_id " , None ) ,
" provider " : runtime . get ( " provider " ) ,
" base_url " : runtime . get ( " base_url " ) ,
" stdout " : buf . getvalue ( ) ,
" stderr " : err . getvalue ( ) ,
}
except Exception as exc :
exit_code = 1
payload = {
" error " : str ( exc ) ,
" stdout " : buf . getvalue ( ) ,
" stderr " : err . getvalue ( ) ,
}
print ( json . dumps ( payload ) )
sys . exit ( exit_code )
"""
command = [
str ( HERMES_PYTHON ) if HERMES_PYTHON . exists ( ) else sys . executable ,
" -c " ,
runner ,
str ( HERMES_AGENT_DIR ) ,
tagged ,
_model ,
system_prompt or " " ,
" 1 " if disable_all_tools else " 0 " ,
" 1 " if skip_context_files else " 0 " ,
" 1 " if skip_memory else " 0 " ,
str ( max_iterations ) ,
]
result = subprocess . run (
command ,
cwd = str ( HERMES_AGENT_DIR ) ,
capture_output = True ,
text = True ,
timeout = 900 ,
)
payload = json . loads ( ( result . stdout or " " ) . strip ( ) or " {} " )
output = str ( payload . get ( " response " , " " ) ) . strip ( )
stderr_output = str ( payload . get ( " stderr " , " " ) ) . strip ( )
stdout_output = str ( payload . get ( " stdout " , " " ) ) . strip ( )
if result . returncode != 0 :
raise RuntimeError (
(
result . stderr
or str ( payload . get ( " error " , " " ) ) . strip ( )
or stderr_output
or stdout_output
or output
or " hermes run failed "
) . strip ( )
2026-03-26 17:00:22 -04:00
)
2026-03-27 18:48:36 -04:00
session_id = payload . get ( " session_id " )
response = output
2026-03-26 17:00:22 -04:00
# Log to metrics jsonl
METRICS_DIR . mkdir ( parents = True , exist_ok = True )
metrics_file = METRICS_DIR / f " local_ { datetime . now ( ) . strftime ( ' % Y % m %d ' ) } .jsonl "
record = {
" timestamp " : datetime . now ( timezone . utc ) . isoformat ( ) ,
" model " : _model ,
" caller " : caller_tag or " unknown " ,
" prompt_len " : len ( prompt ) ,
" response_len " : len ( response ) ,
2026-03-27 18:09:28 -04:00
" session_id " : session_id ,
2026-03-26 17:00:22 -04:00
" success " : bool ( response ) ,
}
with open ( metrics_file , " a " ) as f :
f . write ( json . dumps ( record ) + " \n " )
2026-03-27 18:09:28 -04:00
if not response :
return None
return {
" response " : response ,
" session_id " : session_id ,
2026-03-27 18:48:36 -04:00
" raw_output " : json . dumps ( payload , sort_keys = True ) ,
2026-03-27 18:09:28 -04:00
}
2026-03-26 17:00:22 -04:00
except Exception as e :
# Log failure
METRICS_DIR . mkdir ( parents = True , exist_ok = True )
metrics_file = METRICS_DIR / f " local_ { datetime . now ( ) . strftime ( ' % Y % m %d ' ) } .jsonl "
record = {
" timestamp " : datetime . now ( timezone . utc ) . isoformat ( ) ,
" model " : _model ,
" caller " : caller_tag or " unknown " ,
" error " : str ( e ) ,
" success " : False ,
}
with open ( metrics_file , " a " ) as f :
f . write ( json . dumps ( record ) + " \n " )
return None
2026-03-25 21:12:26 +00:00
2026-03-27 18:09:28 -04:00
def hermes_local ( prompt , model = None , caller_tag = None , toolsets = None ) :
result = run_hermes_local (
prompt = prompt ,
model = model ,
caller_tag = caller_tag ,
toolsets = toolsets ,
)
if not result :
return None
return result . get ( " response " )
2026-03-27 18:48:36 -04:00
ARCHIVE_EPHEMERAL_SYSTEM_PROMPT = (
" You are running a private archive-processing microtask for Timmy. \n "
" Use only the supplied user message. \n "
" Do not use tools, memory, Honcho, SOUL.md, AGENTS.md, or outside knowledge. \n "
" Do not invent facts. \n "
" If the prompt requests JSON, return only valid JSON. "
)
def run_archive_hermes ( prompt , caller_tag , model = None ) :
return run_hermes_local (
prompt = prompt ,
model = model ,
caller_tag = caller_tag ,
system_prompt = ARCHIVE_EPHEMERAL_SYSTEM_PROMPT ,
disable_all_tools = True ,
skip_context_files = True ,
skip_memory = True ,
max_iterations = 3 ,
)
2026-03-27 16:00:29 -04:00
# ── Know Thy Father: Twitter Archive Ingestion ───────────────────────
ARCHIVE_DIR = TIMMY_HOME / " twitter-archive "
2026-03-27 18:09:28 -04:00
ARCHIVE_EXTRACTED_DIR = ARCHIVE_DIR / " extracted "
ARCHIVE_NOTES_DIR = ARCHIVE_DIR / " notes "
ARCHIVE_KNOWLEDGE_DIR = ARCHIVE_DIR / " knowledge "
ARCHIVE_CANDIDATES_DIR = ARCHIVE_KNOWLEDGE_DIR / " candidates "
ARCHIVE_PROFILE_FILE = ARCHIVE_KNOWLEDGE_DIR / " profile.json "
ARCHIVE_CHANGES_FILE = ARCHIVE_KNOWLEDGE_DIR / " changes.jsonl "
ARCHIVE_INSIGHTS_DIR = ARCHIVE_DIR / " insights "
ARCHIVE_TRAINING_DIR = ARCHIVE_DIR / " training "
ARCHIVE_TRAINING_EXAMPLES_DIR = ARCHIVE_TRAINING_DIR / " examples "
ARCHIVE_TRAINING_DPO_DIR = ARCHIVE_TRAINING_DIR / " dpo "
ARCHIVE_TRAINING_EVALS_DIR = ARCHIVE_TRAINING_DIR / " evals "
ARCHIVE_TRAINING_RUNS_DIR = ARCHIVE_TRAINING_DIR / " runs "
ARCHIVE_METRICS_DIR = ARCHIVE_DIR / " metrics "
2026-03-27 16:00:29 -04:00
ARCHIVE_CHECKPOINT = ARCHIVE_DIR / " checkpoint.json "
ARCHIVE_LOCK = ARCHIVE_DIR / " .lock "
2026-03-27 18:09:28 -04:00
ARCHIVE_PROGRESS_FILE = ARCHIVE_METRICS_DIR / " progress.json "
ARCHIVE_SOURCE_CONFIG = ARCHIVE_DIR / " source_config.json "
ARCHIVE_PIPELINE_CONFIG = ARCHIVE_DIR / " pipeline_config.json "
ARCHIVE_TWEETS_FILE = ARCHIVE_EXTRACTED_DIR / " tweets.jsonl "
ARCHIVE_RETWEETS_FILE = ARCHIVE_EXTRACTED_DIR / " retweets.jsonl "
ARCHIVE_MANIFEST_FILE = ARCHIVE_EXTRACTED_DIR / " manifest.json "
ARCHIVE_TRAIN_STATE_FILE = ARCHIVE_TRAINING_DIR / " last_train_state.json "
ARCHIVE_ACTIVE_MODEL_FILE = ARCHIVE_TRAINING_DIR / " active_model.json "
ARCHIVE_PROMOTION_STATE_FILE = ARCHIVE_TRAINING_DIR / " promotion_state.json "
ARCHIVE_BATCH_SIZE = 50
def ensure_archive_layout ( ) :
for path in (
ARCHIVE_DIR ,
ARCHIVE_EXTRACTED_DIR ,
ARCHIVE_NOTES_DIR ,
ARCHIVE_KNOWLEDGE_DIR ,
ARCHIVE_CANDIDATES_DIR ,
ARCHIVE_INSIGHTS_DIR ,
ARCHIVE_TRAINING_DIR ,
ARCHIVE_TRAINING_EXAMPLES_DIR ,
ARCHIVE_TRAINING_DPO_DIR ,
ARCHIVE_TRAINING_EVALS_DIR ,
ARCHIVE_TRAINING_RUNS_DIR ,
ARCHIVE_METRICS_DIR ,
) :
path . mkdir ( parents = True , exist_ok = True )
def read_json ( path , default ) :
if not path . exists ( ) :
return json . loads ( json . dumps ( default ) )
try :
return json . loads ( path . read_text ( ) )
except json . JSONDecodeError :
return json . loads ( json . dumps ( default ) )
def write_json ( path , payload ) :
path . parent . mkdir ( parents = True , exist_ok = True )
path . write_text ( json . dumps ( payload , indent = 2 , sort_keys = True ) + " \n " )
def write_text ( path , payload ) :
path . parent . mkdir ( parents = True , exist_ok = True )
cleaned = payload . rstrip ( )
path . write_text ( ( cleaned + " \n " ) if cleaned else " " )
def load_jsonl ( path ) :
if not path . exists ( ) :
return [ ]
rows = [ ]
for line in path . read_text ( ) . splitlines ( ) :
line = line . strip ( )
if not line :
continue
rows . append ( json . loads ( line ) )
return rows
def write_jsonl ( path , rows ) :
path . parent . mkdir ( parents = True , exist_ok = True )
with open ( path , " w " ) as handle :
for row in rows :
handle . write ( json . dumps ( row , sort_keys = True ) + " \n " )
def append_jsonl ( path , rows ) :
if not rows :
return
path . parent . mkdir ( parents = True , exist_ok = True )
with open ( path , " a " ) as handle :
for row in rows :
handle . write ( json . dumps ( row , sort_keys = True ) + " \n " )
2026-03-27 16:00:29 -04:00
2026-03-27 18:09:28 -04:00
def latest_path ( directory , pattern ) :
matches = sorted ( directory . glob ( pattern ) )
return matches [ - 1 ] if matches else None
def count_jsonl_rows ( path ) :
if not path . exists ( ) :
return 0
with open ( path ) as handle :
return sum ( 1 for line in handle if line . strip ( ) )
2026-03-28 22:00:56 -04:00
def port_open ( port ) :
sock = socket . socket ( )
sock . settimeout ( 1 )
try :
sock . connect ( ( " 127.0.0.1 " , port ) )
return True
except Exception :
return False
finally :
sock . close ( )
def fetch_http_title ( url ) :
try :
with urllib . request . urlopen ( url , timeout = 5 ) as resp :
raw = resp . read ( ) . decode ( " utf-8 " , " ignore " )
match = re . search ( r " <title>(.*?)</title> " , raw , re . IGNORECASE | re . DOTALL )
return match . group ( 1 ) . strip ( ) if match else " NO TITLE "
except Exception as exc :
return f " ERROR: { exc } "
def latest_files ( root , limit = 5 ) :
root = Path ( root )
if not root . exists ( ) :
return [ ]
items = [ ]
for path in root . rglob ( " * " ) :
if not path . is_file ( ) :
continue
try :
stat = path . stat ( )
except OSError :
continue
items . append ( ( stat . st_mtime , path , stat . st_size ) )
items . sort ( reverse = True )
return [
{
" path " : str ( path ) ,
" mtime " : datetime . fromtimestamp ( mtime ) . isoformat ( ) ,
" size " : size ,
}
for mtime , path , size in items [ : limit ]
]
def read_jsonl_rows ( path ) :
path = Path ( path )
if not path . exists ( ) :
return [ ]
rows = [ ]
with open ( path ) as handle :
for line in handle :
line = line . strip ( )
if not line :
continue
try :
rows . append ( json . loads ( line ) )
except Exception :
continue
return rows
def telegram_send_document ( path , caption ) :
if not TELEGRAM_BOT_TOKEN_FILE . exists ( ) :
return { " ok " : False , " error " : " token file missing " }
token = TELEGRAM_BOT_TOKEN_FILE . read_text ( ) . strip ( )
result = subprocess . run (
[
" curl " ,
" -s " ,
" -X " ,
" POST " ,
f " https://api.telegram.org/bot { token } /sendDocument " ,
" -F " ,
f " chat_id= { TELEGRAM_CHAT_ID } " ,
" -F " ,
f " caption= { caption } " ,
" -F " ,
f " document=@ { path } " ,
] ,
capture_output = True ,
text = True ,
timeout = 30 ,
)
try :
return json . loads ( result . stdout . strip ( ) or " {} " )
except Exception :
return { " ok " : False , " error " : result . stdout . strip ( ) or result . stderr . strip ( ) }
def telegram_send_message ( text , parse_mode = " HTML " ) :
if not TELEGRAM_BOT_TOKEN_FILE . exists ( ) :
return { " ok " : False , " error " : " token file missing " }
token = TELEGRAM_BOT_TOKEN_FILE . read_text ( ) . strip ( )
payload = urllib . parse . urlencode (
{
" chat_id " : TELEGRAM_CHAT_ID ,
" text " : text ,
" parse_mode " : parse_mode ,
" disable_web_page_preview " : " false " ,
}
) . encode ( )
try :
req = urllib . request . Request (
f " https://api.telegram.org/bot { token } /sendMessage " ,
data = payload ,
)
with urllib . request . urlopen ( req , timeout = 20 ) as resp :
return json . loads ( resp . read ( ) . decode ( ) )
except Exception as exc :
return { " ok " : False , " error " : str ( exc ) }
def open_report_in_browser ( path ) :
try :
subprocess . run ( [ " open " , str ( path ) ] , check = True , timeout = 10 )
return { " ok " : True }
except Exception as exc :
return { " ok " : False , " error " : str ( exc ) }
def render_evening_html ( title , subtitle , executive_summary , local_pulse , gitea_lines , research_lines , what_matters , look_first ) :
return f """ <!doctype html>
< html lang = \" en \" >
< head >
< meta charset = \" utf-8 \" >
< meta name = \" viewport \" content= \" width=device-width, initial-scale=1 \" >
< title > { html . escape ( title ) } < / title >
< style >
: root { { - - bg : #07101b; --panel:#0d1b2a; --text:#ecf3ff; --muted:#9bb1c9; --accent:#5eead4; --link:#8ec5ff; }}
* { { box - sizing : border - box ; } }
body { { margin : 0 ; font - family : Inter , system - ui , - apple - system , sans - serif ; background : radial - gradient ( circle at top , #14253a 0%,#07101b 55%,#04080f 100%); color:var(--text); }}
. wrap { { max - width : 1100 px ; margin : 0 auto ; padding : 48 px 22 px 80 px ; } }
. hero { { background : linear - gradient ( 135 deg , rgba ( 94 , 234 , 212 , .14 ) , rgba ( 124 , 58 , 237 , .16 ) ) ; border : 1 px solid rgba ( 142 , 197 , 255 , .16 ) ; border - radius : 24 px ; padding : 34 px 30 px ; box - shadow : 0 20 px 50 px rgba ( 0 , 0 , 0 , .25 ) ; } }
. kicker { { text - transform : uppercase ; letter - spacing : .16 em ; color : var ( - - accent ) ; font - size : 12 px ; font - weight : 700 ; } }
h1 { { margin : 10 px 0 8 px ; font - size : 42 px ; line - height : 1.05 ; } }
. subtitle { { color : var ( - - muted ) ; font - size : 15 px ; } }
. grid { { display : grid ; grid - template - columns : repeat ( auto - fit , minmax ( 280 px , 1 fr ) ) ; gap : 18 px ; margin - top : 24 px ; } }
. card { { background : rgba ( 13 , 27 , 42 , .9 ) ; border : 1 px solid rgba ( 142 , 197 , 255 , .12 ) ; border - radius : 20 px ; padding : 20 px ; } }
. card h2 { { margin : 0 0 12 px ; font - size : 22 px ; } }
. card p , . card li { { line - height : 1.55 ; } }
. card ul { { margin : 0 ; padding - left : 18 px ; } }
a { { color : var ( - - link ) ; text - decoration : none ; } }
a : hover { { text - decoration : underline ; } }
. footer { { margin - top : 26 px ; color : var ( - - muted ) ; font - size : 14 px ; } }
< / style >
< / head >
< body >
< div class = \" wrap \" >
< div class = \" hero \" >
< div class = \" kicker \" >timmy time · morning report</div>
< h1 > { html . escape ( title ) } < / h1 >
< div class = \" subtitle \" > { html.escape(subtitle)}</div>
< / div >
< div class = \" grid \" >
< div class = \" card \" ><h2>Executive Summary</h2><p> { html.escape(executive_summary)}</p></div>
< div class = \" card \" ><h2>Local Pulse</h2><ul> { ' ' .join(f ' <li> { html.escape(line)}</li> ' for line in local_pulse)}</ul></div>
< / div >
< div class = \" grid \" >
< div class = \" card \" ><h2>Gitea Pulse</h2><ul> { ' ' .join(f ' <li> {line} </li> ' for line in gitea_lines)}</ul></div>
< div class = \" card \" ><h2>Pertinent Research</h2><ul> { ' ' .join(f ' <li> { html.escape(line)}</li> ' for line in research_lines)}</ul></div>
< div class = \" card \" ><h2>What Matters Today</h2><ul> { ' ' .join(f ' <li> { html.escape(line)}</li> ' for line in what_matters)}</ul></div>
< / div >
< div class = \" card \" style= \" margin-top:18px \" ><h2>Look Here First</h2><p> { html.escape(look_first)}</p></div>
< div class = \" footer \" >Generated locally on the Mac for Alexander Whitestone. Sovereignty and service always.</div>
< / div >
< / body >
< / html > """
2026-03-27 18:09:28 -04:00
def archive_default_checkpoint ( ) :
return {
" data_source " : " tweets " ,
" batch_size " : ARCHIVE_BATCH_SIZE ,
" next_offset " : 0 ,
" batches_completed " : 0 ,
" phase " : " discovery " ,
" confidence " : " low " ,
" next_focus " : " look for recurring themes and recurring people " ,
" understanding_version " : 0 ,
" last_batch_id " : None ,
" last_batch_sessions " : { } ,
" last_profile_update " : None ,
" last_dpo_build " : None ,
" last_insight_file " : None ,
}
def load_archive_checkpoint ( ) :
checkpoint = archive_default_checkpoint ( )
checkpoint . update ( read_json ( ARCHIVE_CHECKPOINT , { } ) )
return checkpoint
def load_pipeline_config ( ) :
return read_json ( ARCHIVE_PIPELINE_CONFIG , { } )
def load_train_state ( ) :
return read_json (
ARCHIVE_TRAIN_STATE_FILE ,
{
" last_total_batches " : 0 ,
" last_total_pairs " : 0 ,
" last_candidate_id " : None ,
" awaiting_eval " : False ,
" last_run_status " : " never-run " ,
" last_run_at " : None ,
} ,
)
def extract_first_json_object ( text ) :
cleaned = text . strip ( ) . replace ( " ```json " , " " ) . replace ( " ``` " , " " )
for index , character in enumerate ( cleaned ) :
if character != " { " :
continue
try :
payload , _ = JSON_DECODER . raw_decode ( cleaned [ index : ] )
except json . JSONDecodeError :
continue
if isinstance ( payload , dict ) :
return payload
raise ValueError ( " No JSON object found " )
def parse_json_output ( stdout = " " , stderr = " " ) :
for source in ( stdout or " " , stderr or " " ) :
if not source . strip ( ) :
continue
try :
return extract_first_json_object ( source )
except ValueError :
continue
return { }
def run_timmy_home_module ( module_name , args = None , timeout = 120 ) :
ensure_archive_layout ( )
command = [ sys . executable , " -m " , module_name ]
if args :
command . extend ( args )
result = subprocess . run (
command ,
cwd = str ( TIMMY_HOME ) ,
capture_output = True ,
text = True ,
timeout = timeout ,
)
payload = parse_json_output ( result . stdout , result . stderr )
if not payload :
payload = {
" stdout " : result . stdout . strip ( ) ,
" stderr " : result . stderr . strip ( ) ,
}
payload [ " returncode " ] = result . returncode
if result . returncode != 0 :
payload . setdefault ( " status " , " error " )
else :
payload . setdefault ( " status " , " ok " )
return payload
def archive_counts ( ) :
total_batches = len ( list ( ARCHIVE_CANDIDATES_DIR . glob ( " batch_*.json " ) ) )
total_pairs = sum ( count_jsonl_rows ( path ) for path in ARCHIVE_TRAINING_DPO_DIR . glob ( " pairs_*.jsonl " ) )
return {
" total_batches " : total_batches ,
" total_pairs " : total_pairs ,
}
def archive_progress_snapshot ( ) :
checkpoint = load_archive_checkpoint ( )
profile = read_json ( ARCHIVE_PROFILE_FILE , { " claims " : [ ] } )
durable_claims = [
claim for claim in profile . get ( " claims " , [ ] ) if claim . get ( " status " ) == " durable "
]
snapshot = {
" batches_completed " : checkpoint . get ( " batches_completed " , 0 ) ,
" next_offset " : checkpoint . get ( " next_offset " , 0 ) ,
" phase " : checkpoint . get ( " phase " , " discovery " ) ,
" candidate_batches " : len ( list ( ARCHIVE_CANDIDATES_DIR . glob ( " batch_*.json " ) ) ) ,
" durable_claims " : len ( durable_claims ) ,
" training_examples " : sum (
count_jsonl_rows ( path ) for path in ARCHIVE_TRAINING_EXAMPLES_DIR . glob ( " batch_*.jsonl " )
) ,
" dpo_pair_files " : len ( list ( ARCHIVE_TRAINING_DPO_DIR . glob ( " pairs_*.jsonl " ) ) ) ,
" dpo_pairs " : sum (
count_jsonl_rows ( path ) for path in ARCHIVE_TRAINING_DPO_DIR . glob ( " pairs_*.jsonl " )
) ,
" latest_dpo_file " : latest_path ( ARCHIVE_TRAINING_DPO_DIR , " pairs_*.jsonl " ) . name
if latest_path ( ARCHIVE_TRAINING_DPO_DIR , " pairs_*.jsonl " )
else None ,
" latest_note " : latest_path ( ARCHIVE_NOTES_DIR , " batch_*.md " ) . name
if latest_path ( ARCHIVE_NOTES_DIR , " batch_*.md " )
else None ,
" latest_eval " : latest_path ( ARCHIVE_TRAINING_EVALS_DIR , " run_*.json " ) . name
if latest_path ( ARCHIVE_TRAINING_EVALS_DIR , " run_*.json " )
else None ,
}
write_json ( ARCHIVE_PROGRESS_FILE , snapshot )
return snapshot
def archive_batch_id ( batch_number ) :
return f " batch_ { batch_number : 03d } "
def archive_profile_summary ( profile ) :
claims = profile . get ( " claims " , [ ] )
durable = [ claim for claim in claims if claim . get ( " status " ) == " durable " ] [ : 12 ]
provisional = [ claim for claim in claims if claim . get ( " status " ) == " provisional " ] [ : 8 ]
return {
" durable_claims " : durable ,
" provisional_claims " : provisional ,
}
def format_tweets_for_prompt ( rows ) :
formatted = [ ]
for index , row in enumerate ( rows , start = 1 ) :
formatted . append (
f " { index } . tweet_id= { row . get ( ' tweet_id ' ) } created_at= { row . get ( ' created_at ' ) } \n "
f " text= { row . get ( ' full_text ' ) } "
)
return " \n \n " . join ( formatted )
def normalize_candidate_entry ( candidate , batch_id , index ) :
category = str ( candidate . get ( " category " ) or " recurring-theme " ) . strip ( )
claim = str ( candidate . get ( " claim " ) or " " ) . strip ( )
if not claim :
return None
quotes = [ ]
for quote in candidate . get ( " evidence_quotes " , [ ] ) [ : 5 ] :
quote = str ( quote ) . strip ( )
if quote and quote not in quotes :
quotes . append ( quote )
evidence_ids = [ ]
for tweet_id in candidate . get ( " evidence_tweet_ids " , [ ] ) :
tweet_id = str ( tweet_id ) . strip ( )
if tweet_id and tweet_id not in evidence_ids :
evidence_ids . append ( tweet_id )
try :
confidence = float ( candidate . get ( " confidence " , 0.5 ) )
except ( TypeError , ValueError ) :
confidence = 0.5
confidence = max ( 0.0 , min ( confidence , 1.0 ) )
status = str ( candidate . get ( " status " ) or " provisional " ) . strip ( ) . lower ( )
if status not in { " provisional " , " durable " , " retracted " } :
status = " provisional "
contradictions = [ ]
for item in candidate . get ( " contradicts " , [ ] ) [ : 5 ] :
item = str ( item ) . strip ( )
if item and item not in contradictions :
contradictions . append ( item )
return {
" id " : f " { batch_id } -candidate- { index : 02d } " ,
" category " : category ,
" claim " : claim ,
" evidence_tweet_ids " : evidence_ids ,
" evidence_quotes " : quotes ,
" confidence " : round ( confidence , 3 ) ,
" status " : status ,
" first_seen_at " : batch_id ,
" last_confirmed_at " : batch_id ,
" contradicts " : contradictions ,
}
def normalize_training_examples ( examples , batch_id , tweet_ids , fallback_prompt , fallback_response ) :
normalized = [ ]
for index , example in enumerate ( examples , start = 1 ) :
prompt = str ( example . get ( " prompt " ) or example . get ( " instruction " ) or " " ) . strip ( )
response = str ( example . get ( " response " ) or example . get ( " answer " ) or " " ) . strip ( )
if not prompt or not response :
continue
normalized . append (
{
" example_id " : f " { batch_id } -example- { index : 02d } " ,
" batch_id " : batch_id ,
" task_type " : str ( example . get ( " task_type " ) or " analysis " ) . strip ( ) or " analysis " ,
" prompt " : prompt ,
" response " : response ,
" tweet_ids " : tweet_ids ,
}
)
if normalized :
return normalized
return [
{
" example_id " : f " { batch_id } -example-01 " ,
" batch_id " : batch_id ,
" task_type " : " analysis " ,
" prompt " : fallback_prompt ,
" response " : fallback_response ,
" tweet_ids " : tweet_ids ,
}
]
def normalize_rubric_scores ( scores ) :
rubric = { }
for key in ( " grounding " , " specificity " , " source_distinction " , " actionability " ) :
try :
rubric [ key ] = float ( scores . get ( key , 0 ) )
except ( TypeError , ValueError ) :
rubric [ key ] = 0.0
return rubric
def build_archive_draft_prompt ( batch_id , checkpoint , profile , prior_note , batch_rows ) :
tweet_ids = [ row . get ( " tweet_id " ) for row in batch_rows ]
previous_summary = archive_profile_summary ( profile )
return (
" You are Timmy, reading Alexander ' s private Twitter archive. \n "
" Work only from the supplied tweets. Do not invent facts. Separate explicit facts from inference. \n "
" Return ONLY valid JSON with this schema: \n "
' { '
' " notes_markdown " : " ... " , '
' " knowledge_candidates " :[ { '
' " category " : " trait|preference|project|relationship|value|recurring-theme " , '
' " claim " : " ... " , '
' " evidence_tweet_ids " :[ " ... " ], '
' " evidence_quotes " :[ " ... " ], '
' " confidence " :0.0, '
' " status " : " provisional " , '
' " contradicts " :[ " optional contradiction hint " ] '
' }], '
' " training_examples " :[ { " prompt " : " ... " , " response " : " ... " , " task_type " : " analysis " }], '
' " phase " : " discovery|synthesis|refinement " , '
' " confidence " : " low|medium|high " , '
' " next_focus " : " ... " '
' } \n \n '
f " Batch id: { batch_id } \n "
f " Checkpoint: { json . dumps ( checkpoint , indent = 2 ) } \n "
f " Previous profile summary: { json . dumps ( previous_summary , indent = 2 ) } \n "
f " Prior batch note excerpt: { prior_note [ - 2500 : ] if prior_note else ' none ' } \n "
f " Tweet ids in this batch: { tweet_ids } \n \n "
" Tweets: \n "
f " { format_tweets_for_prompt ( batch_rows ) } \n "
)
def build_archive_critique_prompt ( batch_id , draft_payload , batch_rows ) :
rubric = {
" grounding " : " Every material claim must be supported by quoted evidence and tweet ids. " ,
" specificity " : " Avoid bland summaries; identify concrete traits, projects, values, and relationships. " ,
" source_distinction " : " Mark inference carefully and never upgrade speculation into fact. " ,
" actionability " : " Training examples should teach Timmy how to read Alexander usefully. " ,
}
return (
" You are the critique pass for Timmy ' s private Twitter archive learning loop. \n "
" Rewrite the draft into a stronger, more grounded version. \n "
" Return ONLY valid JSON with this schema: \n "
' { '
' " notes_markdown " : " ... " , '
' " knowledge_candidates " :[ { '
' " category " : " trait|preference|project|relationship|value|recurring-theme " , '
' " claim " : " ... " , '
' " evidence_tweet_ids " :[ " ... " ], '
' " evidence_quotes " :[ " ... " ], '
' " confidence " :0.0, '
' " status " : " provisional " , '
' " contradicts " :[ " optional contradiction hint " ] '
' }], '
' " training_examples " :[ { " prompt " : " ... " , " response " : " ... " , " task_type " : " analysis " }], '
' " rubric_scores " : { " grounding " :0, " specificity " :0, " source_distinction " :0, " actionability " :0}, '
' " phase " : " discovery|synthesis|refinement " , '
' " confidence " : " low|medium|high " , '
' " next_focus " : " ... " '
' } \n \n '
f " Batch id: { batch_id } \n "
f " Rubric: { json . dumps ( rubric , indent = 2 ) } \n "
f " Draft payload: { json . dumps ( draft_payload , indent = 2 ) } \n "
" Tweets: \n "
f " { format_tweets_for_prompt ( batch_rows ) } \n "
)
def build_weekly_insight_prompt ( profile , recent_batches ) :
return (
" You are Timmy preparing a private weekly insight brief about Alexander. \n "
" Use the profile plus recent batch deltas to produce grounded, actionable insights. \n "
" Return ONLY valid JSON with this schema: \n "
' { '
' " markdown_report " : " ... " , '
' " opportunities " :[ { '
' " id " : " ... " , '
' " theme " : " ... " , '
' " insight " : " ... " , '
' " why_it_matters " : " ... " , '
' " evidence_tweet_ids " :[ " ... " ], '
' " suggested_action " : " ... " , '
' " confidence " :0.0, '
' " time_horizon " : " this week|this month|long-term " '
' }] '
' } \n \n '
f " Profile: { json . dumps ( archive_profile_summary ( profile ) , indent = 2 ) } \n "
f " Recent batches: { json . dumps ( recent_batches , indent = 2 ) } \n "
)
def latest_eval_gate ( ) :
latest_eval = latest_path ( ARCHIVE_TRAINING_EVALS_DIR , " run_*.json " )
if not latest_eval :
return None
return run_timmy_home_module (
" scripts.twitter_archive.evaluate_candidate " ,
args = [ " --eval-file " , str ( latest_eval ) ] ,
timeout = 60 ,
)
def training_command_env ( ) :
return {
" TIMMY_ARCHIVE_DIR " : str ( ARCHIVE_DIR ) ,
" TIMMY_HOME " : str ( TIMMY_HOME ) ,
}
def _archive_extract_impl ( ) :
return run_timmy_home_module ( " scripts.twitter_archive.extract_archive " )
2026-03-27 16:00:29 -04:00
@huey.task ( )
2026-03-27 18:09:28 -04:00
def archive_extract ( ) :
""" Deterministically extract tweets.js into the private JSONL workspace. """
return _archive_extract_impl ( )
2026-03-27 16:00:29 -04:00
2026-03-27 18:09:28 -04:00
def _archive_profile_consolidate_impl ( ) :
checkpoint = load_archive_checkpoint ( )
result = run_timmy_home_module ( " scripts.twitter_archive.consolidate_profile " )
if result . get ( " status " ) == " ok " :
checkpoint [ " last_profile_update " ] = datetime . now ( timezone . utc ) . isoformat ( )
write_json ( ARCHIVE_CHECKPOINT , checkpoint )
return result
2026-03-27 16:00:29 -04:00
2026-03-27 18:09:28 -04:00
@huey.task ( )
def archive_profile_consolidate ( ) :
""" Merge batch candidate files into a deterministic archive profile. """
return _archive_profile_consolidate_impl ( )
def _archive_dpo_build_impl ( ) :
checkpoint = load_archive_checkpoint ( )
result = run_timmy_home_module ( " scripts.twitter_archive.build_dpo_pairs " )
if result . get ( " status " ) == " ok " :
checkpoint [ " last_dpo_build " ] = datetime . now ( timezone . utc ) . isoformat ( )
write_json ( ARCHIVE_CHECKPOINT , checkpoint )
return result
@huey.task ( )
def archive_dpo_build ( ) :
""" Build local-only DPO pairs from completed archive batches. """
return _archive_dpo_build_impl ( )
def _archive_pipeline_health_impl ( ) :
result = run_timmy_home_module ( " scripts.twitter_archive.pipeline_health " )
latest_session = latest_path ( HERMES_HOME / " sessions " , " session_*.json " )
latest_dpo = latest_path ( ARCHIVE_TRAINING_DPO_DIR , " pairs_*.jsonl " )
if latest_session :
result [ " latest_session " ] = latest_session . name
if latest_dpo :
result [ " latest_dpo_file " ] = latest_dpo . name
if latest_session and latest_dpo and latest_session . stat ( ) . st_mtime > latest_dpo . stat ( ) . st_mtime :
issues = result . setdefault ( " issues " , [ ] )
issues . append ( " latest Hermes session is newer than latest archive DPO file " )
result [ " ok " ] = False
result [ " progress " ] = archive_progress_snapshot ( )
return result
@huey.task ( )
def archive_pipeline_health ( ) :
""" Check the private archive pipeline for stalled or missing stages. """
return _archive_pipeline_health_impl ( )
def _know_thy_father_impl ( ) :
ensure_archive_layout ( )
extraction = _archive_extract_impl ( )
if extraction . get ( " status " ) != " ok " :
return { " status " : " error " , " reason " : " archive extraction failed " , " extract " : extraction }
checkpoint = load_archive_checkpoint ( )
tweets = load_jsonl ( ARCHIVE_TWEETS_FILE )
if not tweets :
return { " status " : " error " , " reason " : " no extracted tweets found " }
offset = int ( checkpoint . get ( " next_offset " , 0 ) or 0 )
if offset > = len ( tweets ) :
return {
" status " : " complete " ,
" batches_completed " : checkpoint . get ( " batches_completed " , 0 ) ,
" tweet_count " : len ( tweets ) ,
" progress " : archive_progress_snapshot ( ) ,
}
batch_rows = tweets [ offset : offset + ARCHIVE_BATCH_SIZE ]
batch_number = int ( checkpoint . get ( " batches_completed " , 0 ) or 0 ) + 1
batch_id = archive_batch_id ( batch_number )
batch_tweet_ids = [ str ( row . get ( " tweet_id " ) ) for row in batch_rows ]
profile = read_json ( ARCHIVE_PROFILE_FILE , { " claims " : [ ] } )
previous_note = " "
previous_batch = checkpoint . get ( " last_batch_id " )
if previous_batch :
previous_note_path = ARCHIVE_NOTES_DIR / f " { previous_batch } .md "
if previous_note_path . exists ( ) :
previous_note = previous_note_path . read_text ( )
draft_prompt = build_archive_draft_prompt (
batch_id = batch_id ,
checkpoint = checkpoint ,
profile = profile ,
prior_note = previous_note ,
batch_rows = batch_rows ,
)
2026-03-27 18:48:36 -04:00
draft_run = run_archive_hermes (
2026-03-27 18:09:28 -04:00
prompt = draft_prompt ,
caller_tag = f " know-thy-father-draft: { batch_id } " ,
2026-03-27 16:00:29 -04:00
)
2026-03-27 18:09:28 -04:00
if not draft_run :
return { " status " : " error " , " reason " : " draft pass failed " }
2026-03-27 16:00:29 -04:00
2026-03-27 18:09:28 -04:00
write_text ( ARCHIVE_TRAINING_RUNS_DIR / f " { batch_id } _draft.txt " , draft_run [ " response " ] )
try :
draft_payload = extract_first_json_object ( draft_run [ " response " ] )
except ValueError :
return { " status " : " error " , " reason " : " draft pass did not return JSON " , " batch_id " : batch_id }
critique_prompt = build_archive_critique_prompt ( batch_id = batch_id , draft_payload = draft_payload , batch_rows = batch_rows )
2026-03-27 18:48:36 -04:00
critique_run = run_archive_hermes (
2026-03-27 18:09:28 -04:00
prompt = critique_prompt ,
caller_tag = f " know-thy-father-critique: { batch_id } " ,
)
if not critique_run :
return { " status " : " error " , " reason " : " critique pass failed " , " batch_id " : batch_id }
2026-03-27 16:00:29 -04:00
2026-03-27 18:09:28 -04:00
write_text ( ARCHIVE_TRAINING_RUNS_DIR / f " { batch_id } _critique.txt " , critique_run [ " response " ] )
2026-03-27 16:00:29 -04:00
try :
2026-03-27 18:09:28 -04:00
critique_payload = extract_first_json_object ( critique_run [ " response " ] )
except ValueError :
return { " status " : " error " , " reason " : " critique pass did not return JSON " , " batch_id " : batch_id }
notes_markdown = str ( critique_payload . get ( " notes_markdown " ) or " " ) . strip ( )
if not notes_markdown :
return { " status " : " error " , " reason " : " critique output missing notes " , " batch_id " : batch_id }
knowledge_candidates = [ ]
for index , candidate in enumerate ( critique_payload . get ( " knowledge_candidates " , [ ] ) , start = 1 ) :
normalized = normalize_candidate_entry ( candidate , batch_id , index )
if normalized :
knowledge_candidates . append ( normalized )
training_examples = normalize_training_examples (
critique_payload . get ( " training_examples " , [ ] ) ,
batch_id = batch_id ,
tweet_ids = batch_tweet_ids ,
fallback_prompt = " Read this batch of Alexander ' s tweets and write grounded notes with evidence. " ,
fallback_response = notes_markdown ,
)
note_body = (
f " # { batch_id } \n \n "
f " - Batch number: { batch_number } \n "
f " - Tweet range: { offset } to { offset + len ( batch_rows ) - 1 } \n "
f " - Tweet ids: { ' , ' . join ( batch_tweet_ids ) } \n \n "
f " { notes_markdown } \n "
)
write_text ( ARCHIVE_NOTES_DIR / f " { batch_id } .md " , note_body )
write_jsonl ( ARCHIVE_TRAINING_EXAMPLES_DIR / f " { batch_id } .jsonl " , training_examples )
batch_payload = {
" batch_id " : batch_id ,
" batch_number " : batch_number ,
" tweet_ids " : batch_tweet_ids ,
" prompt " : draft_prompt ,
" rejected " : str ( draft_payload . get ( " notes_markdown " ) or draft_run [ " response " ] ) . strip ( ) ,
" chosen " : notes_markdown ,
" draft_session_id " : draft_run . get ( " session_id " ) ,
" critique_session_id " : critique_run . get ( " session_id " ) ,
" rubric_scores " : normalize_rubric_scores ( critique_payload . get ( " rubric_scores " , { } ) ) ,
" knowledge_candidates " : knowledge_candidates ,
" training_examples " : training_examples ,
" phase " : str ( critique_payload . get ( " phase " ) or checkpoint . get ( " phase " ) or " discovery " ) ,
" confidence " : str ( critique_payload . get ( " confidence " ) or checkpoint . get ( " confidence " ) or " low " ) ,
" next_focus " : str ( critique_payload . get ( " next_focus " ) or checkpoint . get ( " next_focus " ) or " " ) ,
" draft_response_file " : f " { batch_id } _draft.txt " ,
" critique_response_file " : f " { batch_id } _critique.txt " ,
}
write_json ( ARCHIVE_CANDIDATES_DIR / f " { batch_id } .json " , batch_payload )
checkpoint [ " next_offset " ] = offset + len ( batch_rows )
checkpoint [ " batches_completed " ] = batch_number
checkpoint [ " phase " ] = batch_payload [ " phase " ]
checkpoint [ " confidence " ] = batch_payload [ " confidence " ]
checkpoint [ " next_focus " ] = batch_payload [ " next_focus " ]
checkpoint [ " understanding_version " ] = batch_number
checkpoint [ " last_batch_id " ] = batch_id
checkpoint [ " last_batch_sessions " ] = {
" draft " : draft_run . get ( " session_id " ) ,
" critique " : critique_run . get ( " session_id " ) ,
}
write_json ( ARCHIVE_CHECKPOINT , checkpoint )
profile_result = _archive_profile_consolidate_impl ( )
dpo_result = _archive_dpo_build_impl ( )
health_result = _archive_pipeline_health_impl ( )
return {
" status " : " ok " ,
" batch_id " : batch_id ,
" batch_number " : batch_number ,
" tweets_processed " : len ( batch_rows ) ,
" next_offset " : checkpoint [ " next_offset " ] ,
" knowledge_candidates " : len ( knowledge_candidates ) ,
" training_examples " : len ( training_examples ) ,
" profile " : profile_result ,
" dpo " : dpo_result ,
" health " : health_result ,
}
2026-03-27 16:00:29 -04:00
2026-03-27 18:09:28 -04:00
@huey.task ( )
@huey.lock_task ( " know-thy-father " )
def know_thy_father ( ) :
""" Process one explicit 50-tweet archive batch into private learning artifacts. """
return _know_thy_father_impl ( )
def _archive_weekly_insights_impl ( ) :
ensure_archive_layout ( )
profile = read_json ( ARCHIVE_PROFILE_FILE , { " claims " : [ ] } )
if not profile . get ( " claims " ) :
return { " status " : " error " , " reason " : " profile is empty; run know_thy_father first " }
recent_batches = [ ]
for path in sorted ( ARCHIVE_CANDIDATES_DIR . glob ( " batch_*.json " ) ) [ - 3 : ] :
batch = read_json ( path , { } )
recent_batches . append (
{
" batch_id " : batch . get ( " batch_id " , path . stem ) ,
" tweet_ids " : batch . get ( " tweet_ids " , [ ] ) [ : 10 ] ,
" next_focus " : batch . get ( " next_focus " ) ,
" knowledge_candidates " : batch . get ( " knowledge_candidates " , [ ] ) [ : 5 ] ,
}
)
prompt = build_weekly_insight_prompt ( profile = profile , recent_batches = recent_batches )
2026-03-27 18:48:36 -04:00
insight_run = run_archive_hermes ( prompt = prompt , caller_tag = " archive-weekly-insights " )
2026-03-27 18:09:28 -04:00
if not insight_run :
return { " status " : " error " , " reason " : " insight pass failed " }
try :
insight_payload = extract_first_json_object ( insight_run [ " response " ] )
except ValueError :
return { " status " : " error " , " reason " : " insight pass did not return JSON " }
date_key = datetime . now ( timezone . utc ) . strftime ( " % Y % m %d " )
weekly_file = ARCHIVE_INSIGHTS_DIR / f " weekly_ { date_key } .md "
opportunities_file = ARCHIVE_INSIGHTS_DIR / " opportunities.json "
markdown_report = str ( insight_payload . get ( " markdown_report " ) or " " ) . strip ( )
opportunities = [ ]
for item in insight_payload . get ( " opportunities " , [ ] ) :
opportunity = {
" id " : str ( item . get ( " id " ) or f " opportunity- { len ( opportunities ) + 1 } " ) . strip ( ) ,
" theme " : str ( item . get ( " theme " ) or " " ) . strip ( ) ,
" insight " : str ( item . get ( " insight " ) or " " ) . strip ( ) ,
" why_it_matters " : str ( item . get ( " why_it_matters " ) or " " ) . strip ( ) ,
" evidence_tweet_ids " : [ str ( tweet_id ) for tweet_id in item . get ( " evidence_tweet_ids " , [ ] ) if str ( tweet_id ) . strip ( ) ] ,
" suggested_action " : str ( item . get ( " suggested_action " ) or " " ) . strip ( ) ,
" confidence " : round ( float ( item . get ( " confidence " , 0.0 ) or 0.0 ) , 3 ) ,
" time_horizon " : str ( item . get ( " time_horizon " ) or " this week " ) . strip ( ) ,
}
if opportunity [ " theme " ] and opportunity [ " insight " ] and opportunity [ " suggested_action " ] :
opportunities . append ( opportunity )
write_text ( weekly_file , markdown_report )
write_json ( opportunities_file , { " generated_at " : datetime . now ( timezone . utc ) . isoformat ( ) , " opportunities " : opportunities } )
checkpoint = load_archive_checkpoint ( )
checkpoint [ " last_insight_file " ] = weekly_file . name
write_json ( ARCHIVE_CHECKPOINT , checkpoint )
archive_progress_snapshot ( )
2026-03-27 16:00:29 -04:00
return {
" status " : " ok " ,
2026-03-27 18:09:28 -04:00
" weekly_file " : weekly_file . name ,
" opportunities " : len ( opportunities ) ,
" session_id " : insight_run . get ( " session_id " ) ,
}
@huey.task ( )
def archive_weekly_insights ( ) :
""" Generate the private weekly insight brief from the current profile. """
return _archive_weekly_insights_impl ( )
def _archive_train_adapter_impl ( ) :
ensure_archive_layout ( )
counts = archive_counts ( )
state = load_train_state ( )
eval_gate = latest_eval_gate ( )
if state . get ( " awaiting_eval " ) :
if not eval_gate or not eval_gate . get ( " pass " ) :
return {
" status " : " blocked " ,
" reason " : " latest candidate eval is missing or still red " ,
" last_candidate_id " : state . get ( " last_candidate_id " ) ,
" eval " : eval_gate ,
}
new_pairs = max ( 0 , counts [ " total_pairs " ] - int ( state . get ( " last_total_pairs " , 0 ) or 0 ) )
new_batches = max ( 0 , counts [ " total_batches " ] - int ( state . get ( " last_total_batches " , 0 ) or 0 ) )
if new_pairs < 200 and new_batches < 10 :
return {
" status " : " not-ready " ,
" new_pairs " : new_pairs ,
" new_batches " : new_batches ,
" threshold " : { " pairs " : 200 , " batches " : 10 } ,
}
pipeline_config = load_pipeline_config ( )
train_command = str ( pipeline_config . get ( " train_command " ) or " " ) . strip ( )
timestamp = datetime . now ( timezone . utc ) . strftime ( " % Y % m %d _ % H % M % S " )
candidate_id = f " timmy-archive- { timestamp } "
run_log = ARCHIVE_TRAINING_RUNS_DIR / f " train_ { timestamp } .log "
run_manifest = {
" status " : " ready " if not train_command else " started " ,
" candidate_id " : candidate_id ,
" new_pairs " : new_pairs ,
" new_batches " : new_batches ,
" train_command " : train_command or None ,
" created_at " : datetime . now ( timezone . utc ) . isoformat ( ) ,
}
if not train_command :
write_json ( ARCHIVE_TRAINING_RUNS_DIR / f " train_ { timestamp } .json " , run_manifest )
return run_manifest
env = os . environ . copy ( )
env . update ( training_command_env ( ) )
result = subprocess . run (
[ " /bin/zsh " , " -lc " , train_command ] ,
cwd = str ( TIMMY_HOME ) ,
capture_output = True ,
text = True ,
timeout = 3600 ,
env = env ,
)
run_log . write_text ( ( result . stdout or " " ) + ( " \n " + result . stderr if result . stderr else " " ) )
run_manifest [ " exit_code " ] = result . returncode
run_manifest [ " log_file " ] = run_log . name
run_manifest [ " status " ] = " ok " if result . returncode == 0 else " error "
write_json ( ARCHIVE_TRAINING_RUNS_DIR / f " train_ { timestamp } .json " , run_manifest )
if result . returncode == 0 :
state . update (
{
" last_total_batches " : counts [ " total_batches " ] ,
" last_total_pairs " : counts [ " total_pairs " ] ,
" last_candidate_id " : candidate_id ,
" awaiting_eval " : True ,
" last_run_status " : " ok " ,
" last_run_at " : datetime . now ( timezone . utc ) . isoformat ( ) ,
}
)
write_json ( ARCHIVE_TRAIN_STATE_FILE , state )
else :
state . update (
{
" last_run_status " : " error " ,
" last_run_at " : datetime . now ( timezone . utc ) . isoformat ( ) ,
}
)
write_json ( ARCHIVE_TRAIN_STATE_FILE , state )
return run_manifest
@huey.task ( )
def archive_train_adapter ( ) :
""" Train an archive-reading adapter when DPO thresholds and eval gates allow. """
return _archive_train_adapter_impl ( )
def _archive_promote_candidate_impl ( ) :
eval_gate = latest_eval_gate ( )
if not eval_gate :
return { " status " : " blocked " , " reason " : " missing eval file " }
if not eval_gate . get ( " pass " ) :
write_json (
ARCHIVE_PROMOTION_STATE_FILE ,
{
" status " : " blocked " ,
" reason " : " promotion gate failed " ,
" evaluated_at " : datetime . now ( timezone . utc ) . isoformat ( ) ,
" eval " : eval_gate ,
} ,
)
return { " status " : " blocked " , " eval " : eval_gate }
pipeline_config = load_pipeline_config ( )
promote_command = str ( pipeline_config . get ( " promote_command " ) or " " ) . strip ( )
timestamp = datetime . now ( timezone . utc ) . strftime ( " % Y % m %d _ % H % M % S " )
decision = {
" status " : " ready " if not promote_command else " started " ,
" candidate_id " : eval_gate . get ( " candidate_id " ) ,
" rollback_model " : eval_gate . get ( " rollback_model " ) ,
" evaluated_at " : datetime . now ( timezone . utc ) . isoformat ( ) ,
" eval " : eval_gate ,
}
if promote_command :
env = os . environ . copy ( )
env . update ( training_command_env ( ) )
env [ " TIMMY_ARCHIVE_CANDIDATE_ID " ] = str ( eval_gate . get ( " candidate_id " ) or " " )
result = subprocess . run (
[ " /bin/zsh " , " -lc " , promote_command ] ,
cwd = str ( TIMMY_HOME ) ,
capture_output = True ,
text = True ,
timeout = 1200 ,
env = env ,
)
log_path = ARCHIVE_TRAINING_RUNS_DIR / f " promote_ { timestamp } .log "
log_path . write_text ( ( result . stdout or " " ) + ( " \n " + result . stderr if result . stderr else " " ) )
decision [ " status " ] = " ok " if result . returncode == 0 else " error "
decision [ " exit_code " ] = result . returncode
decision [ " log_file " ] = log_path . name
if result . returncode != 0 :
write_json ( ARCHIVE_PROMOTION_STATE_FILE , decision )
return decision
write_json (
ARCHIVE_ACTIVE_MODEL_FILE ,
{
" candidate_id " : eval_gate . get ( " candidate_id " ) ,
" rollback_model " : eval_gate . get ( " rollback_model " ) ,
" promoted_at " : datetime . now ( timezone . utc ) . isoformat ( ) ,
} ,
)
write_json ( ARCHIVE_PROMOTION_STATE_FILE , decision )
state = load_train_state ( )
state [ " awaiting_eval " ] = False
state [ " last_run_status " ] = " promoted "
write_json ( ARCHIVE_TRAIN_STATE_FILE , state )
return decision
@huey.task ( )
def archive_promote_candidate ( ) :
""" Promote an archive candidate model only when offline eval gates pass. """
return _archive_promote_candidate_impl ( )
@huey.periodic_task ( crontab ( hour = " */4 " , minute = " 15 " ) )
def archive_pipeline_tick ( ) :
""" Advance the private archive learning loop on a regular cadence. """
batch = _know_thy_father_impl ( )
train = _archive_train_adapter_impl ( )
promote = _archive_promote_candidate_impl ( )
insight = { " status " : " skipped " }
if datetime . now ( timezone . utc ) . weekday ( ) == 0 :
expected = f " weekly_ { datetime . now ( timezone . utc ) . strftime ( ' % Y % m %d ' ) } .md "
if not ( ARCHIVE_INSIGHTS_DIR / expected ) . exists ( ) :
insight = _archive_weekly_insights_impl ( )
return {
" batch " : batch ,
" train " : train ,
" promote " : promote ,
" insight " : insight ,
" health " : _archive_pipeline_health_impl ( ) ,
2026-03-27 16:00:29 -04:00
}
2026-03-25 19:25:01 -04:00
# ── Existing: Orchestration ──────────────────────────────────────────
2026-03-25 21:15:36 +00:00
@huey.periodic_task ( crontab ( minute = " */15 " ) )
2026-03-25 21:12:26 +00:00
def triage_issues ( ) :
2026-03-27 22:19:19 -04:00
""" Passively scan unassigned issues without posting comment spam. """
2026-03-25 21:12:26 +00:00
g = GiteaClient ( )
2026-03-27 22:19:19 -04:00
backlog = [ ]
2026-03-25 21:12:26 +00:00
for repo in REPOS :
for issue in g . find_unassigned_issues ( repo , limit = 10 ) :
2026-03-27 22:19:19 -04:00
backlog . append ( {
" repo " : repo ,
" issue " : issue . number ,
" title " : issue . title ,
} )
return { " unassigned " : len ( backlog ) , " sample " : backlog [ : 20 ] }
2026-03-25 21:12:26 +00:00
2026-03-25 21:15:36 +00:00
@huey.periodic_task ( crontab ( minute = " */30 " ) )
2026-03-25 21:12:26 +00:00
def review_prs ( ) :
2026-03-25 21:15:36 +00:00
""" Review open PRs: check net diff, reject violations. """
2026-03-25 21:12:26 +00:00
g = GiteaClient ( )
2026-03-25 21:15:36 +00:00
reviewed , rejected = 0 , 0
2026-03-25 21:12:26 +00:00
for repo in REPOS :
for pr in g . list_pulls ( repo , state = " open " , limit = 20 ) :
2026-03-25 21:15:36 +00:00
reviewed + = 1
2026-03-25 21:12:26 +00:00
files = g . get_pull_files ( repo , pr . number )
net = sum ( f . additions - f . deletions for f in files )
if net > NET_LINE_LIMIT :
2026-03-25 21:15:36 +00:00
rejected + = 1
2026-03-25 21:12:26 +00:00
g . create_comment (
repo , pr . number ,
f " ❌ Net + { net } lines exceeds the { NET_LINE_LIMIT } -line limit. "
f " Find { net - NET_LINE_LIMIT } lines to cut. See CONTRIBUTING.md. "
)
2026-03-25 21:15:36 +00:00
return { " reviewed " : reviewed , " rejected " : rejected }
2026-03-25 21:12:26 +00:00
2026-03-25 21:15:36 +00:00
@huey.periodic_task ( crontab ( minute = " */10 " ) )
2026-03-25 21:12:26 +00:00
def dispatch_assigned ( ) :
2026-03-25 21:15:36 +00:00
""" Pick up issues assigned to agents and kick off work. """
2026-03-25 21:12:26 +00:00
g = GiteaClient ( )
agents = [ " claude " , " gemini " , " kimi " , " grok " , " perplexity " ]
2026-03-25 21:15:36 +00:00
dispatched = 0
2026-03-25 21:12:26 +00:00
for repo in REPOS :
for agent in agents :
2026-03-25 21:15:36 +00:00
for issue in g . find_agent_issues ( repo , agent , limit = 5 ) :
2026-03-26 08:22:53 -04:00
comments = g . list_comments ( repo , issue . number )
2026-03-25 21:15:36 +00:00
if any ( c . body and " dispatched " in c . body . lower ( ) for c in comments ) :
continue
dispatch_work ( repo , issue . number , agent )
dispatched + = 1
return { " dispatched " : dispatched }
2026-03-25 21:12:26 +00:00
@huey.task ( retries = 3 , retry_delay = 60 )
def dispatch_work ( repo , issue_number , agent ) :
""" Dispatch a single issue to an agent. Huey handles retry. """
g = GiteaClient ( )
g . create_comment (
repo , issue_number ,
2026-03-25 21:15:36 +00:00
f " ⚡ Dispatched to ` { agent } `. Huey task queued. "
2026-03-25 21:12:26 +00:00
)
2026-03-25 19:25:01 -04:00
# ── NEW 1: Config Sync ───────────────────────────────────────────────
@huey.periodic_task ( crontab ( minute = " 0 " ) ) # every hour on the hour
def sync_config_up ( ) :
""" Push live ~/.hermes config changes UP to timmy-config repo. """
script = TIMMY_HOME / " timmy-config " / " bin " / " sync-up.sh "
if not script . exists ( ) :
return { " error " : " sync-up.sh not found " }
result = subprocess . run (
[ " bash " , str ( script ) ] ,
capture_output = True , text = True , timeout = 60
)
return {
" exit_code " : result . returncode ,
" output " : result . stdout [ - 500 : ] if result . stdout else " " ,
" error " : result . stderr [ - 200 : ] if result . stderr else " " ,
}
# ── NEW 2: Session Export for DPO ────────────────────────────────────
@huey.periodic_task ( crontab ( hour = " */4 " , minute = " 30 " ) ) # every 4 hours
def session_export ( ) :
""" Scan recent sessions, extract conversation pairs for DPO training. """
sessions_dir = HERMES_HOME / " sessions "
export_dir = TIMMY_HOME / " training-data " / " dpo-pairs "
export_dir . mkdir ( parents = True , exist_ok = True )
marker_file = export_dir / " .last_export "
last_export = " "
if marker_file . exists ( ) :
last_export = marker_file . read_text ( ) . strip ( )
exported = 0
session_files = sorted ( sessions_dir . glob ( " session_*.json " ) )
for sf in session_files :
if sf . name < = last_export :
continue
try :
data = json . loads ( sf . read_text ( ) )
messages = data . get ( " messages " , [ ] )
# Extract user->assistant pairs (raw material for DPO curation)
pairs = [ ]
for i , msg in enumerate ( messages ) :
if msg . get ( " role " ) == " user " and i + 1 < len ( messages ) :
next_msg = messages [ i + 1 ]
if next_msg . get ( " role " ) == " assistant " :
pairs . append ( {
" prompt " : msg . get ( " content " , " " ) [ : 2000 ] ,
" chosen " : next_msg . get ( " content " , " " ) [ : 2000 ] ,
" session " : sf . name ,
} )
if pairs :
out_file = export_dir / sf . name
out_file . write_text ( json . dumps ( pairs , indent = 2 ) )
exported + = 1
except ( json . JSONDecodeError , KeyError ) :
continue
# Update marker
if session_files :
marker_file . write_text ( session_files [ - 1 ] . name )
return { " exported " : exported , " total_sessions " : len ( session_files ) }
# ── NEW 3: Model Health Check ────────────────────────────────────────
@huey.periodic_task ( crontab ( minute = " */5 " ) ) # every 5 minutes
def model_health ( ) :
2026-03-27 17:35:07 -04:00
""" Check the active local inference surface and export freshness. """
2026-03-25 19:25:01 -04:00
checks = { }
2026-03-27 17:35:07 -04:00
models_url = f " { LOCAL_PROVIDER_BASE_URL } /models "
chat_url = f " { LOCAL_PROVIDER_BASE_URL } /chat/completions "
checks [ " provider " ] = " local-llama.cpp "
checks [ " provider_base_url " ] = LOCAL_PROVIDER_BASE_URL
checks [ " provider_model " ] = LOCAL_PROVIDER_MODEL
2026-03-25 19:25:01 -04:00
2026-03-27 17:35:07 -04:00
# 1. Is the local inference process running?
2026-03-25 19:25:01 -04:00
try :
result = subprocess . run (
2026-03-27 17:35:07 -04:00
[ " pgrep " , " -f " , " llama-server|ollama " ] ,
2026-03-25 19:25:01 -04:00
capture_output = True , timeout = 5
)
2026-03-27 17:35:07 -04:00
checks [ " local_inference_running " ] = result . returncode == 0
2026-03-25 19:25:01 -04:00
except Exception :
2026-03-27 17:35:07 -04:00
checks [ " local_inference_running " ] = False
2026-03-25 19:25:01 -04:00
2026-03-27 17:35:07 -04:00
# 2. Can we hit the configured API?
2026-03-25 19:25:01 -04:00
try :
import urllib . request
2026-03-27 17:35:07 -04:00
req = urllib . request . Request ( models_url )
2026-03-25 19:25:01 -04:00
with urllib . request . urlopen ( req , timeout = 5 ) as resp :
data = json . loads ( resp . read ( ) )
2026-03-27 17:35:07 -04:00
models = [ m . get ( " id " , " ? " ) for m in data . get ( " data " , [ ] ) ]
2026-03-25 19:25:01 -04:00
checks [ " models_loaded " ] = models
checks [ " api_responding " ] = True
except Exception as e :
checks [ " api_responding " ] = False
checks [ " error " ] = str ( e )
# 3. Can we do a tiny inference?
if checks . get ( " api_responding " ) :
try :
payload = json . dumps ( {
2026-03-27 17:35:07 -04:00
" model " : LOCAL_PROVIDER_MODEL ,
2026-03-25 19:25:01 -04:00
" messages " : [ { " role " : " user " , " content " : " ping " } ] ,
" max_tokens " : 5 ,
" stream " : False ,
} ) . encode ( )
req = urllib . request . Request (
2026-03-27 17:35:07 -04:00
chat_url ,
2026-03-25 19:25:01 -04:00
data = payload ,
headers = { " Content-Type " : " application/json " } ,
)
with urllib . request . urlopen ( req , timeout = 30 ) as resp :
checks [ " inference_ok " ] = resp . status == 200
except Exception as e :
checks [ " inference_ok " ] = False
checks [ " inference_error " ] = str ( e )
2026-03-27 17:35:07 -04:00
# 4. Is session export keeping up with new Hermes sessions?
sessions_dir = HERMES_HOME / " sessions "
export_dir = TIMMY_HOME / " training-data " / " dpo-pairs "
latest_session = newest_file ( sessions_dir , " session_*.json " )
latest_export = newest_file ( export_dir , " session_*.json " )
checks [ " latest_session " ] = latest_session . name if latest_session else None
checks [ " latest_export " ] = latest_export . name if latest_export else None
if latest_session and latest_export :
session_mtime = latest_session . stat ( ) . st_mtime
export_mtime = latest_export . stat ( ) . st_mtime
lag_minutes = max ( 0 , int ( ( session_mtime - export_mtime ) / / 60 ) )
checks [ " export_lag_minutes " ] = lag_minutes
checks [ " export_fresh " ] = lag_minutes < = 300
elif latest_session and not latest_export :
checks [ " export_lag_minutes " ] = None
checks [ " export_fresh " ] = False
else :
checks [ " export_lag_minutes " ] = 0
checks [ " export_fresh " ] = True
2026-03-25 19:25:01 -04:00
# Write health status to a file for other tools to read
health_file = HERMES_HOME / " model_health.json "
checks [ " timestamp " ] = datetime . now ( timezone . utc ) . isoformat ( )
health_file . write_text ( json . dumps ( checks , indent = 2 ) )
return checks
# ── NEW 4: Heartbeat Tick ────────────────────────────────────────────
@huey.periodic_task ( crontab ( minute = " */10 " ) ) # every 10 minutes
def heartbeat_tick ( ) :
""" Perceive — Reflect — Remember — Decide — Act — Learn.
This is the nervous system . Each tick :
1. Perceive : gather state ( Gitea activity , model health , open issues )
2. Reflect : what changed since last tick ?
3. Remember : log perception to episodic memory
4. Decide : anything need action ?
5. Act : create comments , close issues , alert
6. Learn : log outcome for training data
"""
tick_dir = TIMMY_HOME / " heartbeat "
tick_dir . mkdir ( parents = True , exist_ok = True )
now = datetime . now ( timezone . utc )
tick_id = now . strftime ( " % Y % m %d _ % H % M % S " )
perception = { }
# PERCEIVE: gather state
try :
g = GiteaClient ( )
perception [ " gitea_alive " ] = g . ping ( )
except Exception :
perception [ " gitea_alive " ] = False
# Model health (read from health file)
health_file = HERMES_HOME / " model_health.json "
if health_file . exists ( ) :
try :
perception [ " model_health " ] = json . loads ( health_file . read_text ( ) )
except Exception :
perception [ " model_health " ] = " unreadable "
# Open issue/PR counts
if perception . get ( " gitea_alive " ) :
try :
g = GiteaClient ( )
for repo in REPOS :
issues = g . list_issues ( repo , state = " open " , limit = 1 )
pulls = g . list_pulls ( repo , state = " open " , limit = 1 )
perception [ repo ] = {
" open_issues " : len ( issues ) ,
" open_prs " : len ( pulls ) ,
}
except Exception as e :
perception [ " gitea_error " ] = str ( e )
# Huey consumer alive (we're running, so yes)
perception [ " huey_alive " ] = True
# REFLECT + REMEMBER: compare to last tick, log
last_tick_file = tick_dir / " last_tick.json "
last_tick = { }
if last_tick_file . exists ( ) :
try :
last_tick = json . loads ( last_tick_file . read_text ( ) )
except Exception :
pass
tick_record = {
" tick_id " : tick_id ,
" timestamp " : now . isoformat ( ) ,
" perception " : perception ,
" previous_tick " : last_tick . get ( " tick_id " , " none " ) ,
}
2026-03-26 17:00:22 -04:00
# DECIDE: let hermes4:14b reason about what to do
decide_prompt = (
f " System state at { now . isoformat ( ) } : \n \n "
f " { json . dumps ( perception , indent = 2 ) } \n \n "
f " Previous tick: { last_tick . get ( ' tick_id ' , ' none ' ) } \n \n "
" You are the heartbeat monitor. Based on this state: \n "
" 1. List any actions needed (alerts, restarts, escalations). Empty if all OK. \n "
" 2. Rate severity: ok, warning, or critical. \n "
" 3. One sentence of reasoning. \n \n "
' Respond ONLY with JSON: { " actions " : [], " severity " : " ok " , " reasoning " : " ... " } '
)
decision = None
try :
raw = hermes_local ( decide_prompt , caller_tag = " heartbeat_tick " )
if raw :
# Model might wrap JSON in markdown, extract first { line
for line in raw . split ( " \n " ) :
line = line . strip ( )
if line . startswith ( " { " ) :
decision = json . loads ( line )
break
if not decision :
decision = json . loads ( raw )
except ( json . JSONDecodeError , Exception ) :
decision = None
# Fallback to hardcoded logic if model fails or is down
if decision is None :
actions = [ ]
if not perception . get ( " gitea_alive " ) :
actions . append ( " ALERT: Gitea unreachable " )
health = perception . get ( " model_health " , { } )
2026-03-27 18:09:28 -04:00
if isinstance ( health , dict ) and not health . get ( " local_inference_running " ) :
actions . append ( " ALERT: local inference surface not running " )
2026-03-26 17:00:22 -04:00
decision = {
" actions " : actions ,
" severity " : " fallback " ,
" reasoning " : " model unavailable, used hardcoded checks " ,
}
tick_record [ " decision " ] = decision
actions = decision . get ( " actions " , [ ] )
2026-03-25 19:25:01 -04:00
# Save tick
last_tick_file . write_text ( json . dumps ( tick_record , indent = 2 ) )
# LEARN: append to episodic log
log_file = tick_dir / f " ticks_ { now . strftime ( ' % Y % m %d ' ) } .jsonl "
with open ( log_file , " a " ) as f :
f . write ( json . dumps ( tick_record ) + " \n " )
return tick_record
# ── NEW 5: Memory Compress (Morning Briefing) ───────────────────────
@huey.periodic_task ( crontab ( hour = " 8 " , minute = " 0 " ) ) # 8 AM daily
def memory_compress ( ) :
""" Morning briefing — compress recent heartbeat ticks into summary.
Reads yesterday ' s tick log, compresses into a briefing file
that can be injected into system prompt at startup .
"""
tick_dir = TIMMY_HOME / " heartbeat "
briefing_dir = TIMMY_HOME / " briefings "
briefing_dir . mkdir ( parents = True , exist_ok = True )
# Find yesterday's tick log
from datetime import timedelta
yesterday = ( datetime . now ( timezone . utc ) - timedelta ( days = 1 ) ) . strftime ( " % Y % m %d " )
tick_log = tick_dir / f " ticks_ { yesterday } .jsonl "
if not tick_log . exists ( ) :
return { " status " : " no ticks from yesterday " }
# Read all ticks
ticks = [ ]
for line in tick_log . read_text ( ) . strip ( ) . split ( " \n " ) :
try :
ticks . append ( json . loads ( line ) )
except Exception :
continue
if not ticks :
return { " status " : " empty tick log " }
# Compress: extract key facts
alerts = [ ]
gitea_down_count = 0
2026-03-27 18:09:28 -04:00
inference_down_count = 0
2026-03-25 19:25:01 -04:00
for t in ticks :
for action in t . get ( " actions " , [ ] ) :
alerts . append ( f " [ { t [ ' tick_id ' ] } ] { action } " )
p = t . get ( " perception " , { } )
if not p . get ( " gitea_alive " ) :
gitea_down_count + = 1
health = p . get ( " model_health " , { } )
2026-03-27 18:09:28 -04:00
if isinstance ( health , dict ) and not health . get ( " local_inference_running " ) :
inference_down_count + = 1
2026-03-25 19:25:01 -04:00
# Last tick's perception = current state
last = ticks [ - 1 ] . get ( " perception " , { } )
briefing = {
" date " : yesterday ,
" total_ticks " : len ( ticks ) ,
" alerts " : alerts [ - 10 : ] , # last 10 alerts
" gitea_downtime_ticks " : gitea_down_count ,
2026-03-27 18:09:28 -04:00
" local_inference_downtime_ticks " : inference_down_count ,
2026-03-25 19:25:01 -04:00
" last_known_state " : last ,
}
briefing_file = briefing_dir / f " briefing_ { yesterday } .json "
briefing_file . write_text ( json . dumps ( briefing , indent = 2 ) )
return briefing
2026-03-25 21:29:43 -04:00
# ── NEW 6: Good Morning Report ───────────────────────────────────────
@huey.periodic_task ( crontab ( hour = " 6 " , minute = " 0 " ) ) # 6 AM daily
def good_morning_report ( ) :
2026-03-28 22:00:56 -04:00
""" Generate Alexander ' s official morning report.
Delivery contract :
- save markdown + beautiful HTML locally
- open the HTML report in the browser on the Mac
- send the full markdown artifact to Telegram plus a readable summary message
- keep claims evidence - rich and honest
2026-03-25 21:29:43 -04:00
"""
2026-03-28 22:00:56 -04:00
now = datetime . now ( ) . astimezone ( )
2026-03-25 21:29:43 -04:00
today = now . strftime ( " % Y- % m- %d " )
day_name = now . strftime ( " % A " )
2026-03-28 22:00:56 -04:00
today_tick_slug = now . strftime ( " % Y % m %d " )
2026-03-25 21:29:43 -04:00
g = GiteaClient ( )
2026-03-28 22:00:56 -04:00
tick_log = TIMMY_HOME / " heartbeat " / f " ticks_ { today_tick_slug } .jsonl "
ticks = read_jsonl_rows ( tick_log )
tick_count = len ( ticks )
gitea_downtime_ticks = sum ( 1 for tick in ticks if not ( tick . get ( " perception " , { } ) or { } ) . get ( " gitea_alive " , True ) )
inference_fail_ticks = sum (
1
for tick in ticks
if not ( ( tick . get ( " perception " , { } ) or { } ) . get ( " model_health " , { } ) or { } ) . get ( " inference_ok " , False )
)
first_green_tick = next (
(
tick . get ( " tick_id " )
for tick in ticks
if ( ( tick . get ( " perception " , { } ) or { } ) . get ( " model_health " , { } ) or { } ) . get ( " inference_ok " , False )
) ,
" none " ,
)
2026-03-25 21:29:43 -04:00
health_file = HERMES_HOME / " model_health.json "
2026-03-28 22:00:56 -04:00
model_health = read_json ( health_file , { } )
provider = model_health . get ( " provider " , " unknown " )
provider_model = model_health . get ( " provider_model " , " unknown " )
provider_base_url = model_health . get ( " provider_base_url " , " unknown " )
model_status = " healthy " if model_health . get ( " inference_ok " ) else " degraded "
2026-03-25 21:29:43 -04:00
2026-03-28 22:00:56 -04:00
huey_line = " not found "
try :
huey_ps = subprocess . run (
[ " bash " , " -lc " , " ps aux | egrep ' huey_consumer|tasks.huey ' | grep -v egrep || true " ] ,
capture_output = True ,
text = True ,
timeout = 10 ,
)
huey_line = huey_ps . stdout . strip ( ) or " not found "
except Exception as exc :
huey_line = f " error: { exc } "
2026-03-25 21:29:43 -04:00
2026-03-28 22:00:56 -04:00
ports = { port : port_open ( port ) for port in [ 4000 , 4001 , 4002 , 4200 , 8765 ] }
nexus_title = fetch_http_title ( " http://127.0.0.1:4200 " )
evennia_title = fetch_http_title ( " http://127.0.0.1:4001/webclient/ " )
2026-03-25 21:29:43 -04:00
2026-03-28 22:00:56 -04:00
evennia_trace = TIMMY_HOME / " training-data " / " evennia " / " live " / today_tick_slug / " nexus-localhost.jsonl "
evennia_events = read_jsonl_rows ( evennia_trace )
last_evennia = evennia_events [ - 1 ] if evennia_events else { }
recent_issue_lines = [ ]
for repo in [ " Timmy_Foundation/timmy-config " , " Timmy_Foundation/the-nexus " , " Timmy_Foundation/timmy-home " ] :
2026-03-25 21:29:43 -04:00
try :
2026-03-28 22:00:56 -04:00
issues = g . list_issues ( repo , state = " open " , sort = " created " , direction = " desc " , limit = 5 )
for issue in issues [ : 3 ] :
recent_issue_lines . append (
f " { repo } # { issue . number } — { issue . title } ( { g . base_url } / { repo } /issues/ { issue . number } ) "
)
2026-03-25 21:29:43 -04:00
except Exception :
2026-03-28 22:00:56 -04:00
continue
2026-03-25 21:29:43 -04:00
2026-03-28 22:00:56 -04:00
recent_pr_lines = [ ]
for repo in [ " Timmy_Foundation/timmy-config " , " Timmy_Foundation/the-nexus " , " Timmy_Foundation/timmy-home " ] :
2026-03-25 21:29:43 -04:00
try :
2026-03-28 22:00:56 -04:00
prs = g . list_pulls ( repo , state = " open " , sort = " newest " , limit = 5 )
for pr in prs [ : 2 ] :
recent_pr_lines . append (
f " { repo } # { pr . number } — { pr . title } ( { g . base_url } / { repo } /pulls/ { pr . number } ) "
)
2026-03-25 21:29:43 -04:00
except Exception :
2026-03-28 22:00:56 -04:00
continue
2026-03-25 21:29:43 -04:00
2026-03-28 22:00:56 -04:00
research_candidates = [ ]
for label , path in [
( " research " , TIMMY_HOME / " research " ) ,
( " reports " , TIMMY_HOME / " reports " ) ,
( " specs " , TIMMY_HOME / " specs " ) ,
] :
for item in latest_files ( path , limit = 3 ) :
research_candidates . append ( f " { label } : { item [ ' path ' ] } (mtime { item [ ' mtime ' ] } ) " )
what_matters = [
" The official report lane is tracked in timmy-config #87 and now runs through the integrated timmy-config automation path. " ,
" The local world stack is alive: Nexus, Evennia, and the local bridge are all up, with replayable Evennia action telemetry already on disk. " ,
" Bannerlord remains an engineering substrate test. If it fails the thin-adapter test, reject it early instead of building falsework around it. " ,
]
executive_summary = (
" The field is sharper this morning. The report lane is now integrated into timmy-config, the local world stack is visibly alive, "
" and Bannerlord is being held to the thin-adapter standard instead of backlog gravity. "
)
note_prompt = (
" Write a short morning note from Timmy to Alexander. Keep it grounded, warm, and brief. "
" Use the following real facts only: "
f " heartbeat ticks= { tick_count } ; gitea downtime ticks= { gitea_downtime_ticks } ; inference fail ticks before recovery= { inference_fail_ticks } ; "
f " current model= { provider_model } ; Nexus title= { nexus_title } ; Evennia title= { evennia_title } ; latest Evennia room/title= { last_evennia . get ( ' room_name ' , last_evennia . get ( ' title ' , ' unknown ' ) ) } . "
)
note_result = run_hermes_local (
prompt = note_prompt ,
caller_tag = " good_morning_report " ,
disable_all_tools = True ,
skip_context_files = True ,
skip_memory = True ,
max_iterations = 3 ,
)
personal_note = note_result . get ( " response " ) if note_result else None
if not personal_note :
personal_note = (
" Good morning, Alexander. The stack held together through the night, and the local world lane is no longer theoretical. "
" We have more proof than posture now. "
)
markdown = f """ # Timmy Time — Good Morning Report
Date : { today }
Audience : Alexander Whitestone
Status : Generated by timmy - config automation
{ today } · { day_name } · generated { now . strftime ( ' % I: % M % p % Z ' ) }
- - -
## Executive Summary
{ executive_summary }
## Overnight / Local Pulse
- Heartbeat log for ` { today_tick_slug } ` : ` { tick_count } ` ticks recorded in ` { tick_log } `
- Gitea downtime ticks : ` { gitea_downtime_ticks } `
- Inference - failure ticks before recovery : ` { inference_fail_ticks } `
- First green local - inference tick : ` { first_green_tick } `
- Current model health file : ` { health_file } `
- Current provider : ` { provider } `
- Current model : ` { provider_model } `
- Current base URL : ` { provider_base_url } `
- Current inference status : ` { model_status } `
- Huey consumer : ` { huey_line } `
### Local surfaces right now
2026-03-25 21:29:43 -04:00
2026-03-28 22:00:56 -04:00
- Nexus port 4200 : ` { ' open ' if ports [ 4200 ] else ' closed ' } ` → title : ` { nexus_title } `
- Evennia telnet 4000 : ` { ' open ' if ports [ 4000 ] else ' closed ' } `
- Evennia web 4001 : ` { ' open ' if ports [ 4001 ] else ' closed ' } ` → title : ` { evennia_title } `
- Evennia websocket 4002 : ` { ' open ' if ports [ 4002 ] else ' closed ' } `
- Local bridge 8765 : ` { ' open ' if ports [ 8765 ] else ' closed ' } `
2026-03-25 21:29:43 -04:00
2026-03-28 22:00:56 -04:00
### Evennia proof of life
2026-03-25 21:29:43 -04:00
2026-03-28 22:00:56 -04:00
- Trace path : ` { evennia_trace } `
- Event count : ` { len ( evennia_events ) } `
- Latest event type : ` { last_evennia . get ( ' type ' , ' unknown ' ) } `
- Latest room / title : ` { last_evennia . get ( ' room_name ' , last_evennia . get ( ' title ' , ' unknown ' ) ) } `
2026-03-25 21:29:43 -04:00
## Gitea Pulse
2026-03-28 22:00:56 -04:00
### Open issues
{ chr ( 10 ) . join ( f ' - { line } ' for line in recent_issue_lines ) if recent_issue_lines else ' - quiet ' }
2026-03-25 21:29:43 -04:00
2026-03-28 22:00:56 -04:00
### Open PRs
{ chr ( 10 ) . join ( f ' - { line } ' for line in recent_pr_lines ) if recent_pr_lines else ' - none ' }
2026-03-25 21:29:43 -04:00
2026-03-28 22:00:56 -04:00
## Pertinent Research / Frontier Movement
{ chr ( 10 ) . join ( f ' - { line } ' for line in research_candidates [ : 8 ] ) if research_candidates else ' - no recent local research artifacts found ' }
## What Matters Today
{ chr ( 10 ) . join ( f ' - { item } ' for item in what_matters ) }
2026-03-25 21:29:43 -04:00
2026-03-28 22:00:56 -04:00
## One Thing To Look At First
2026-03-25 21:29:43 -04:00
2026-03-28 22:00:56 -04:00
Start with ` timmy - config #87`:
- { g . base_url } / Timmy_Foundation / timmy - config / issues / 87
2026-03-25 21:29:43 -04:00
2026-03-28 22:00:56 -04:00
That is the durable system front for this report lane .
2026-03-25 21:29:43 -04:00
2026-03-28 22:00:56 -04:00
## Evidence Appendix
2026-03-25 21:29:43 -04:00
2026-03-28 22:00:56 -04:00
- ` { health_file } `
- ` { tick_log } `
- ` { evennia_trace } `
- ` http : / / 127.0 .0 .1 : 4200 `
- ` http : / / 127.0 .0 .1 : 4001 / webclient / `
- ` { newest_file ( HERMES_HOME / ' cron ' / ' output ' / ' a77a87392582 ' , ' *.md ' ) or ' no recent health monitor artifact found ' } `
## From Timmy
{ personal_note }
2026-03-25 21:29:43 -04:00
— Timmy
"""
2026-03-28 22:00:56 -04:00
html_report = render_evening_html (
title = " Timmy Time — Good Morning Report " ,
subtitle = f " { today } · { day_name } · generated { now . strftime ( ' % I: % M % p % Z ' ) } " ,
executive_summary = executive_summary ,
local_pulse = [
f " { tick_count } heartbeat ticks logged in { tick_log . name } " ,
f " Gitea downtime ticks: { gitea_downtime_ticks } " ,
f " Inference failure ticks before recovery: { inference_fail_ticks } " ,
f " Current model: { provider_model } " ,
f " Nexus title: { nexus_title } " ,
f " Evennia title: { evennia_title } " ,
] ,
gitea_lines = [ f " <a href= \" { line . split ( ' ( ' ) [ - 1 ] . rstrip ( ' ) ' ) } \" > { html . escape ( line . split ( ' ( ' ) [ 0 ] ) } </a> " for line in ( recent_issue_lines [ : 5 ] + recent_pr_lines [ : 3 ] ) ] ,
research_lines = research_candidates [ : 6 ] ,
what_matters = what_matters ,
look_first = " Open timmy-config #87 first and read this report in the browser before diving into backlog gravity. " ,
)
BRIEFING_DIR . mkdir ( parents = True , exist_ok = True )
markdown_path = BRIEFING_DIR / f " { today } .md "
html_path = BRIEFING_DIR / f " { today } .html "
latest_md = BRIEFING_DIR / " latest.md "
latest_html = BRIEFING_DIR / " latest.html "
verification_path = BRIEFING_DIR / f " { today } -verification.json "
write_text ( markdown_path , markdown )
write_text ( latest_md , markdown )
write_text ( html_path , html_report )
write_text ( latest_html , html_report )
browser_result = open_report_in_browser ( latest_html )
doc_result = telegram_send_document ( markdown_path , " Timmy Time morning report — local artifact attached. " )
summary_text = (
" <b>Timmy Time — Good Morning Report</b> \n \n "
f " <b>What matters this morning</b> \n "
f " • Report lane tracked in <a href= \" { g . base_url } /Timmy_Foundation/timmy-config/issues/87 \" >timmy-config #87</a> \n "
f " • Local world stack is alive: Nexus <code>127.0.0.1:4200</code>, Evennia <code>127.0.0.1:4001/webclient/</code>, bridge <code>127.0.0.1:8765</code> \n "
f " • Bannerlord stays an engineering substrate test, not a builder trap \n \n "
f " <b>Evidence</b> \n "
f " • model health: <code> { health_file } </code> \n "
f " • heartbeat: <code> { tick_log } </code> \n "
f " • evennia trace: <code> { evennia_trace } </code> "
)
summary_result = telegram_send_message ( summary_text )
verification = {
" markdown_path " : str ( markdown_path ) ,
" html_path " : str ( html_path ) ,
" latest_markdown " : str ( latest_md ) ,
" latest_html " : str ( latest_html ) ,
" browser_open " : browser_result ,
" telegram_document " : doc_result ,
" telegram_summary " : summary_result ,
" ports " : ports ,
" titles " : { " nexus " : nexus_title , " evennia " : evennia_title } ,
}
write_json ( verification_path , verification )
return verification
2026-03-25 21:29:43 -04:00
# ── NEW 7: Repo Watchdog ─────────────────────────────────────────────
2026-03-25 19:25:01 -04:00
@huey.periodic_task ( crontab ( minute = " */20 " ) ) # every 20 minutes
def repo_watchdog ( ) :
""" Poll Gitea for new issues/PRs since last check. No webhooks needed. """
state_file = HERMES_HOME / " watchdog_state.json "
state = { }
if state_file . exists ( ) :
try :
state = json . loads ( state_file . read_text ( ) )
except Exception :
pass
g = GiteaClient ( )
new_items = [ ]
for repo in REPOS :
repo_state = state . get ( repo , { " last_issue " : 0 , " last_pr " : 0 } )
# Check issues
try :
issues = g . list_issues ( repo , state = " open " , sort = " created " , direction = " desc " , limit = 5 )
for issue in issues :
if issue . number > repo_state [ " last_issue " ] :
new_items . append ( {
" type " : " issue " ,
" repo " : repo ,
" number " : issue . number ,
" title " : issue . title ,
" creator " : issue . user . login if hasattr ( issue , ' user ' ) and issue . user else " unknown " ,
} )
if issues :
repo_state [ " last_issue " ] = max ( i . number for i in issues )
except Exception :
pass
# Check PRs
try :
prs = g . list_pulls ( repo , state = " open " , sort = " newest " , limit = 5 )
for pr in prs :
if pr . number > repo_state . get ( " last_pr " , 0 ) :
new_items . append ( {
" type " : " pr " ,
" repo " : repo ,
" number " : pr . number ,
" title " : pr . title ,
} )
if prs :
repo_state [ " last_pr " ] = max ( p . number for p in prs )
except Exception :
pass
state [ repo ] = repo_state
state_file . write_text ( json . dumps ( state , indent = 2 ) )
return { " new_items " : len ( new_items ) , " items " : new_items [ : 10 ] }
2026-03-26 07:15:54 -04:00
# ── AGENT WORKERS: Gemini + Grok ─────────────────────────────────────
WORKTREE_BASE = Path . home ( ) / " worktrees "
AGENT_LOG_DIR = HERMES_HOME / " logs "
AGENT_CONFIG = {
" gemini " : {
" tool " : " aider " ,
" model " : " gemini/gemini-2.5-pro-preview-05-06 " ,
" api_key_env " : " GEMINI_API_KEY " ,
" gitea_token_file " : HERMES_HOME / " gemini_token " ,
" timeout " : 600 ,
} ,
" grok " : {
" tool " : " opencode " ,
" model " : " xai/grok-3-fast " ,
" api_key_env " : " XAI_API_KEY " ,
" gitea_token_file " : HERMES_HOME / " grok_gitea_token " ,
" timeout " : 600 ,
} ,
}
def _get_agent_issue ( agent_name ) :
2026-03-26 08:22:53 -04:00
""" Find the next issue assigned to this agent that hasn ' t been worked.
Only picks issues where this agent is the SOLE assignee ( not shared ) . """
2026-03-26 07:15:54 -04:00
token_file = AGENT_CONFIG [ agent_name ] [ " gitea_token_file " ]
if not token_file . exists ( ) :
return None , None
g = GiteaClient ( token = token_file . read_text ( ) . strip ( ) )
for repo in REPOS :
try :
issues = g . find_agent_issues ( repo , agent_name , limit = 10 )
for issue in issues :
2026-03-26 08:22:53 -04:00
# Skip if assigned to multiple agents (avoid collisions)
assignees = [ a . login for a in ( issue . assignees or [ ] ) ] if hasattr ( issue , ' assignees ' ) else [ ]
other_agents = [ a for a in assignees if a in AGENT_CONFIG and a != agent_name ]
if other_agents :
continue
# Skip if already being worked on by this agent
comments = g . list_comments ( repo , issue . number )
2026-03-26 07:15:54 -04:00
if any ( c . body and " working on " in c . body . lower ( ) and agent_name in c . body . lower ( ) for c in comments ) :
continue
return repo , issue
except Exception :
continue
return None , None
def _run_agent ( agent_name , repo , issue ) :
""" Clone, branch, run agent tool, push, open PR. """
cfg = AGENT_CONFIG [ agent_name ]
token = cfg [ " gitea_token_file " ] . read_text ( ) . strip ( )
repo_owner , repo_name = repo . split ( " / " )
branch = f " { agent_name } /issue- { issue . number } "
workdir = WORKTREE_BASE / f " { agent_name } - { issue . number } "
log_file = AGENT_LOG_DIR / f " { agent_name } -worker.log "
def log ( msg ) :
with open ( log_file , " a " ) as f :
f . write ( f " [ { datetime . now ( ) . strftime ( ' % Y- % m- %d % H: % M: % S ' ) } ] { msg } \n " )
log ( f " === Starting # { issue . number } : { issue . title } === " )
# Comment that we're working on it
g = GiteaClient ( token = token )
g . create_comment ( repo , issue . number ,
f " 🔧 ` { agent_name } ` working on this via Huey. Branch: ` { branch } ` " )
# Clone
clone_url = f " http:// { agent_name } : { token } @143.198.27.163:3000/ { repo } .git "
if workdir . exists ( ) :
subprocess . run ( [ " rm " , " -rf " , str ( workdir ) ] , timeout = 30 )
result = subprocess . run (
[ " git " , " clone " , " --depth " , " 50 " , clone_url , str ( workdir ) ] ,
capture_output = True , text = True , timeout = 120
)
if result . returncode != 0 :
log ( f " Clone failed: { result . stderr } " )
return { " status " : " clone_failed " , " error " : result . stderr [ : 200 ] }
# Create branch
subprocess . run (
[ " git " , " checkout " , " -b " , branch ] ,
cwd = str ( workdir ) , capture_output = True , timeout = 10
)
# Build prompt
prompt = (
f " Fix issue # { issue . number } : { issue . title } \n \n "
f " { issue . body or ' No description. ' } \n \n "
f " Make minimal, focused changes. Only modify files directly related to this issue. "
)
# Run agent tool
env = os . environ . copy ( )
if cfg [ " api_key_env " ] == " XAI_API_KEY " :
env [ " XAI_API_KEY " ] = Path ( Path . home ( ) / " .config/grok/api_key " ) . read_text ( ) . strip ( )
if cfg [ " tool " ] == " aider " :
cmd = [
" aider " ,
" --model " , cfg [ " model " ] ,
" --no-auto-commits " ,
" --yes-always " ,
" --no-suggest-shell-commands " ,
" --message " , prompt ,
]
else : # opencode
cmd = [
" opencode " , " run " ,
" -m " , cfg [ " model " ] ,
" --no-interactive " ,
prompt ,
]
log ( f " Running: { cfg [ ' tool ' ] } with { cfg [ ' model ' ] } " )
try :
result = subprocess . run (
cmd , cwd = str ( workdir ) , capture_output = True , text = True ,
timeout = cfg [ " timeout " ] , env = env
)
log ( f " Exit code: { result . returncode } " )
log ( f " Stdout (last 500): { result . stdout [ - 500 : ] } " )
if result . stderr :
log ( f " Stderr (last 300): { result . stderr [ - 300 : ] } " )
except subprocess . TimeoutExpired :
log ( " TIMEOUT " )
return { " status " : " timeout " }
# Check if anything changed
diff_result = subprocess . run (
[ " git " , " diff " , " --stat " ] , cwd = str ( workdir ) ,
capture_output = True , text = True , timeout = 10
)
if not diff_result . stdout . strip ( ) :
log ( " No changes produced " )
g . create_comment ( repo , issue . number ,
f " ⚠️ ` { agent_name } ` produced no changes for this issue. Skipping. " )
subprocess . run ( [ " rm " , " -rf " , str ( workdir ) ] , timeout = 30 )
return { " status " : " no_changes " }
# Commit, push, open PR
subprocess . run ( [ " git " , " add " , " -A " ] , cwd = str ( workdir ) , timeout = 10 )
subprocess . run (
[ " git " , " commit " , " -m " , f " [ { agent_name } ] { issue . title } (# { issue . number } ) " ] ,
cwd = str ( workdir ) , capture_output = True , timeout = 30
)
push_result = subprocess . run (
[ " git " , " push " , " -u " , " origin " , branch ] ,
cwd = str ( workdir ) , capture_output = True , text = True , timeout = 60
)
if push_result . returncode != 0 :
log ( f " Push failed: { push_result . stderr } " )
return { " status " : " push_failed " , " error " : push_result . stderr [ : 200 ] }
# Open PR
try :
pr = g . create_pull (
repo ,
title = f " [ { agent_name } ] { issue . title } (# { issue . number } ) " ,
head = branch ,
base = " main " ,
body = f " Closes # { issue . number } \n \n Generated by ` { agent_name } ` via Huey worker. " ,
)
log ( f " PR # { pr . number } created " )
return { " status " : " pr_created " , " pr " : pr . number }
except Exception as e :
log ( f " PR creation failed: { e } " )
return { " status " : " pr_failed " , " error " : str ( e ) [ : 200 ] }
finally :
subprocess . run ( [ " rm " , " -rf " , str ( workdir ) ] , timeout = 30 )
@huey.periodic_task ( crontab ( minute = " */20 " ) )
def gemini_worker ( ) :
""" Gemini picks up an assigned issue, codes it with aider, opens a PR. """
repo , issue = _get_agent_issue ( " gemini " )
if not issue :
return { " status " : " idle " , " reason " : " no issues assigned to gemini " }
return _run_agent ( " gemini " , repo , issue )
@huey.periodic_task ( crontab ( minute = " */20 " ) )
def grok_worker ( ) :
""" Grok picks up an assigned issue, codes it with opencode, opens a PR. """
repo , issue = _get_agent_issue ( " grok " )
if not issue :
return { " status " : " idle " , " reason " : " no issues assigned to grok " }
return _run_agent ( " grok " , repo , issue )
# ── PR Cross-Review ──────────────────────────────────────────────────
@huey.periodic_task ( crontab ( minute = " */30 " ) )
def cross_review_prs ( ) :
""" Gemini reviews Grok ' s PRs. Grok reviews Gemini ' s PRs. """
results = [ ]
for reviewer , author in [ ( " gemini " , " grok " ) , ( " grok " , " gemini " ) ] :
cfg = AGENT_CONFIG [ reviewer ]
token_file = cfg [ " gitea_token_file " ]
if not token_file . exists ( ) :
continue
g = GiteaClient ( token = token_file . read_text ( ) . strip ( ) )
for repo in REPOS :
try :
prs = g . list_pulls ( repo , state = " open " , limit = 10 )
for pr in prs :
# Only review the other agent's PRs
if not pr . title . startswith ( f " [ { author } ] " ) :
continue
# Skip if already reviewed
2026-03-26 08:22:53 -04:00
comments = g . list_comments ( repo , pr . number )
2026-03-26 07:15:54 -04:00
if any ( c . body and f " reviewed by { reviewer } " in c . body . lower ( ) for c in comments ) :
continue
# Get the diff
files = g . get_pull_files ( repo , pr . number )
net = sum ( f . additions - f . deletions for f in files )
file_list = " , " . join ( f . filename for f in files [ : 5 ] )
# Build review prompt
review_prompt = (
f " Review PR # { pr . number } : { pr . title } \n "
f " Files: { file_list } \n "
f " Net change: + { net } lines \n \n "
f " Is this PR focused, correct, and ready to merge? "
f " Reply with APPROVE or REQUEST_CHANGES and a brief reason. "
)
# Run reviewer's tool for analysis
env = os . environ . copy ( )
if cfg [ " api_key_env " ] == " XAI_API_KEY " :
env [ " XAI_API_KEY " ] = Path ( Path . home ( ) / " .config/grok/api_key " ) . read_text ( ) . strip ( )
if cfg [ " tool " ] == " aider " :
cmd = [ " aider " , " --model " , cfg [ " model " ] ,
" --no-auto-commits " , " --yes-always " ,
" --no-suggest-shell-commands " ,
" --message " , review_prompt ]
else :
cmd = [ " opencode " , " run " , " -m " , cfg [ " model " ] ,
" --no-interactive " , review_prompt ]
try :
result = subprocess . run (
cmd , capture_output = True , text = True ,
timeout = 120 , env = env , cwd = " /tmp "
)
review_text = result . stdout [ - 1000 : ] if result . stdout else " No output "
except Exception as e :
review_text = f " Review failed: { e } "
# Post review as comment
g . create_comment ( repo , pr . number ,
f " **Reviewed by ` { reviewer } `:** \n \n { review_text } " )
results . append ( { " reviewer " : reviewer , " pr " : pr . number , " repo " : repo } )
except Exception :
continue
return { " reviews " : len ( results ) , " details " : results }