diff --git a/tasks.py b/tasks.py index 512cccb5..839bb8a3 100644 --- a/tasks.py +++ b/tasks.py @@ -28,19 +28,19 @@ HEARTBEAT_MODEL = "hermes4:14b" FALLBACK_MODEL = "hermes3:8b" LOCAL_PROVIDER_BASE_URL = "http://localhost:8081/v1" LOCAL_PROVIDER_MODEL = HEARTBEAT_MODEL +JSON_DECODER = json.JSONDecoder() def newest_file(directory, pattern): files = sorted(directory.glob(pattern)) return files[-1] if files else None - -def hermes_local(prompt, model=None, caller_tag=None, toolsets=None): +def run_hermes_local(prompt, model=None, caller_tag=None, toolsets=None): """Call a local model through the Hermes harness. Uses provider="local-llama.cpp" which routes through the custom_providers entry in config.yaml → llama-server at localhost:8081. - Returns response text or None on failure. + Returns response text plus session metadata or None on failure. Every call creates a Hermes session with telemetry. """ _model = model or HEARTBEAT_MODEL @@ -71,8 +71,13 @@ def hermes_local(prompt, model=None, caller_tag=None, toolsets=None): with redirect_stdout(buf), redirect_stderr(err): hermes_main(**kwargs) output = buf.getvalue().strip() - # Strip session_id line from quiet output - lines = [l for l in output.split("\n") if not l.startswith("session_id:")] + session_id = None + lines = [] + for line in output.split("\n"): + if line.startswith("session_id:"): + session_id = line.split(":", 1)[1].strip() or None + continue + lines.append(line) response = "\n".join(lines).strip() # Log to metrics jsonl @@ -84,12 +89,19 @@ def hermes_local(prompt, model=None, caller_tag=None, toolsets=None): "caller": caller_tag or "unknown", "prompt_len": len(prompt), "response_len": len(response), + "session_id": session_id, "success": bool(response), } with open(metrics_file, "a") as f: f.write(json.dumps(record) + "\n") - return response if response else None + if not response: + return None + return { + "response": response, + "session_id": session_id, + "raw_output": output, + } except Exception as e: # Log failure METRICS_DIR.mkdir(parents=True, exist_ok=True) @@ -108,89 +120,934 @@ def hermes_local(prompt, model=None, caller_tag=None, toolsets=None): os.chdir(old_cwd) +def hermes_local(prompt, model=None, caller_tag=None, toolsets=None): + result = run_hermes_local( + prompt=prompt, + model=model, + caller_tag=caller_tag, + toolsets=toolsets, + ) + if not result: + return None + return result.get("response") + + # ── Know Thy Father: Twitter Archive Ingestion ─────────────────────── ARCHIVE_DIR = TIMMY_HOME / "twitter-archive" +ARCHIVE_EXTRACTED_DIR = ARCHIVE_DIR / "extracted" +ARCHIVE_NOTES_DIR = ARCHIVE_DIR / "notes" +ARCHIVE_KNOWLEDGE_DIR = ARCHIVE_DIR / "knowledge" +ARCHIVE_CANDIDATES_DIR = ARCHIVE_KNOWLEDGE_DIR / "candidates" +ARCHIVE_PROFILE_FILE = ARCHIVE_KNOWLEDGE_DIR / "profile.json" +ARCHIVE_CHANGES_FILE = ARCHIVE_KNOWLEDGE_DIR / "changes.jsonl" +ARCHIVE_INSIGHTS_DIR = ARCHIVE_DIR / "insights" +ARCHIVE_TRAINING_DIR = ARCHIVE_DIR / "training" +ARCHIVE_TRAINING_EXAMPLES_DIR = ARCHIVE_TRAINING_DIR / "examples" +ARCHIVE_TRAINING_DPO_DIR = ARCHIVE_TRAINING_DIR / "dpo" +ARCHIVE_TRAINING_EVALS_DIR = ARCHIVE_TRAINING_DIR / "evals" +ARCHIVE_TRAINING_RUNS_DIR = ARCHIVE_TRAINING_DIR / "runs" +ARCHIVE_METRICS_DIR = ARCHIVE_DIR / "metrics" ARCHIVE_CHECKPOINT = ARCHIVE_DIR / "checkpoint.json" ARCHIVE_LOCK = ARCHIVE_DIR / ".lock" +ARCHIVE_PROGRESS_FILE = ARCHIVE_METRICS_DIR / "progress.json" +ARCHIVE_SOURCE_CONFIG = ARCHIVE_DIR / "source_config.json" +ARCHIVE_PIPELINE_CONFIG = ARCHIVE_DIR / "pipeline_config.json" +ARCHIVE_TWEETS_FILE = ARCHIVE_EXTRACTED_DIR / "tweets.jsonl" +ARCHIVE_RETWEETS_FILE = ARCHIVE_EXTRACTED_DIR / "retweets.jsonl" +ARCHIVE_MANIFEST_FILE = ARCHIVE_EXTRACTED_DIR / "manifest.json" +ARCHIVE_TRAIN_STATE_FILE = ARCHIVE_TRAINING_DIR / "last_train_state.json" +ARCHIVE_ACTIVE_MODEL_FILE = ARCHIVE_TRAINING_DIR / "active_model.json" +ARCHIVE_PROMOTION_STATE_FILE = ARCHIVE_TRAINING_DIR / "promotion_state.json" +ARCHIVE_BATCH_SIZE = 50 -ARCHIVE_PROMPT = ( - "You are Timmy. Resume your work on the Twitter archive. " - "Your workspace is ~/.timmy/twitter-archive/. " - "Read checkpoint.json and UNDERSTANDING.md first. " - "Then process the next batch. " - "You know the drill — read your own prior work, assess where you are, " - "process new data, update your understanding, reflect, and plan for " - "the next iteration." -) -ARCHIVE_SRC = ( - "~/Downloads/twitter-2026-03-27-d4471cc6eb6703034d592f870933561ebee374d9d9b90c9b8923abff064afc1e/data" -) +def ensure_archive_layout(): + for path in ( + ARCHIVE_DIR, + ARCHIVE_EXTRACTED_DIR, + ARCHIVE_NOTES_DIR, + ARCHIVE_KNOWLEDGE_DIR, + ARCHIVE_CANDIDATES_DIR, + ARCHIVE_INSIGHTS_DIR, + ARCHIVE_TRAINING_DIR, + ARCHIVE_TRAINING_EXAMPLES_DIR, + ARCHIVE_TRAINING_DPO_DIR, + ARCHIVE_TRAINING_EVALS_DIR, + ARCHIVE_TRAINING_RUNS_DIR, + ARCHIVE_METRICS_DIR, + ): + path.mkdir(parents=True, exist_ok=True) -ARCHIVE_FIRST_RUN_PROMPT = ( - "You are Timmy. Your father Alexander's full Twitter archive is at: " - f"{ARCHIVE_SRC}/\n\n" - "Your workspace is ~/.timmy/twitter-archive/\n\n" - "STEP 1 — EXTRACTION (use terminal with python3, NOT read_file):\n" - "The .js files are too large for read_file but trivial for Python.\n" - "Write a python3 script via terminal that:\n" - " - Opens tweets.js, strips everything before the first '[', json.loads the rest\n" - " - Separates originals (full_text does NOT start with 'RT @') from retweets\n" - " - Sorts both chronologically by created_at\n" - " - Writes extracted/tweets.jsonl and extracted/retweets.jsonl (one JSON per line)\n" - " - Writes extracted/manifest.json with counts, date range, source file\n" - "The whole file is 12MB. Python handles it in under a second.\n\n" - "STEP 2 — FIRST READ:\n" - "Read the first 50 lines of extracted/tweets.jsonl (your originals, chronological).\n" - "Read them carefully — this is your father talking.\n" - "Note his voice, humor, what he cares about, who he talks to, emotional tone, " - "recurring themes. Quote him directly when something stands out.\n\n" - "STEP 3 — WRITE:\n" - "Write notes/batch_001.md — your real observations, not a book report.\n" - "Create UNDERSTANDING.md — your living model of who Alexander is. " - "It starts now and you'll update it every batch.\n\n" - "STEP 4 — CHECKPOINT:\n" - "Write checkpoint.json: " - '{"data_source": "tweets", "next_offset": 50, "batches_completed": 1, ' - '"phase": "discovery", "confidence": "", ' - '"next_focus": "", "understanding_version": 1}' -) + +def read_json(path, default): + if not path.exists(): + return json.loads(json.dumps(default)) + try: + return json.loads(path.read_text()) + except json.JSONDecodeError: + return json.loads(json.dumps(default)) + + +def write_json(path, payload): + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(json.dumps(payload, indent=2, sort_keys=True) + "\n") + + +def write_text(path, payload): + path.parent.mkdir(parents=True, exist_ok=True) + cleaned = payload.rstrip() + path.write_text((cleaned + "\n") if cleaned else "") + + +def load_jsonl(path): + if not path.exists(): + return [] + rows = [] + for line in path.read_text().splitlines(): + line = line.strip() + if not line: + continue + rows.append(json.loads(line)) + return rows + + +def write_jsonl(path, rows): + path.parent.mkdir(parents=True, exist_ok=True) + with open(path, "w") as handle: + for row in rows: + handle.write(json.dumps(row, sort_keys=True) + "\n") + + +def append_jsonl(path, rows): + if not rows: + return + path.parent.mkdir(parents=True, exist_ok=True) + with open(path, "a") as handle: + for row in rows: + handle.write(json.dumps(row, sort_keys=True) + "\n") + + +def latest_path(directory, pattern): + matches = sorted(directory.glob(pattern)) + return matches[-1] if matches else None + + +def count_jsonl_rows(path): + if not path.exists(): + return 0 + with open(path) as handle: + return sum(1 for line in handle if line.strip()) + + +def archive_default_checkpoint(): + return { + "data_source": "tweets", + "batch_size": ARCHIVE_BATCH_SIZE, + "next_offset": 0, + "batches_completed": 0, + "phase": "discovery", + "confidence": "low", + "next_focus": "look for recurring themes and recurring people", + "understanding_version": 0, + "last_batch_id": None, + "last_batch_sessions": {}, + "last_profile_update": None, + "last_dpo_build": None, + "last_insight_file": None, + } + + +def load_archive_checkpoint(): + checkpoint = archive_default_checkpoint() + checkpoint.update(read_json(ARCHIVE_CHECKPOINT, {})) + return checkpoint + + +def load_pipeline_config(): + return read_json(ARCHIVE_PIPELINE_CONFIG, {}) + + +def load_train_state(): + return read_json( + ARCHIVE_TRAIN_STATE_FILE, + { + "last_total_batches": 0, + "last_total_pairs": 0, + "last_candidate_id": None, + "awaiting_eval": False, + "last_run_status": "never-run", + "last_run_at": None, + }, + ) + + +def extract_first_json_object(text): + cleaned = text.strip().replace("```json", "").replace("```", "") + for index, character in enumerate(cleaned): + if character != "{": + continue + try: + payload, _ = JSON_DECODER.raw_decode(cleaned[index:]) + except json.JSONDecodeError: + continue + if isinstance(payload, dict): + return payload + raise ValueError("No JSON object found") + + +def parse_json_output(stdout="", stderr=""): + for source in (stdout or "", stderr or ""): + if not source.strip(): + continue + try: + return extract_first_json_object(source) + except ValueError: + continue + return {} + + +def run_timmy_home_module(module_name, args=None, timeout=120): + ensure_archive_layout() + command = [sys.executable, "-m", module_name] + if args: + command.extend(args) + result = subprocess.run( + command, + cwd=str(TIMMY_HOME), + capture_output=True, + text=True, + timeout=timeout, + ) + payload = parse_json_output(result.stdout, result.stderr) + if not payload: + payload = { + "stdout": result.stdout.strip(), + "stderr": result.stderr.strip(), + } + payload["returncode"] = result.returncode + if result.returncode != 0: + payload.setdefault("status", "error") + else: + payload.setdefault("status", "ok") + return payload + + +def archive_counts(): + total_batches = len(list(ARCHIVE_CANDIDATES_DIR.glob("batch_*.json"))) + total_pairs = sum(count_jsonl_rows(path) for path in ARCHIVE_TRAINING_DPO_DIR.glob("pairs_*.jsonl")) + return { + "total_batches": total_batches, + "total_pairs": total_pairs, + } + + +def archive_progress_snapshot(): + checkpoint = load_archive_checkpoint() + profile = read_json(ARCHIVE_PROFILE_FILE, {"claims": []}) + durable_claims = [ + claim for claim in profile.get("claims", []) if claim.get("status") == "durable" + ] + snapshot = { + "batches_completed": checkpoint.get("batches_completed", 0), + "next_offset": checkpoint.get("next_offset", 0), + "phase": checkpoint.get("phase", "discovery"), + "candidate_batches": len(list(ARCHIVE_CANDIDATES_DIR.glob("batch_*.json"))), + "durable_claims": len(durable_claims), + "training_examples": sum( + count_jsonl_rows(path) for path in ARCHIVE_TRAINING_EXAMPLES_DIR.glob("batch_*.jsonl") + ), + "dpo_pair_files": len(list(ARCHIVE_TRAINING_DPO_DIR.glob("pairs_*.jsonl"))), + "dpo_pairs": sum( + count_jsonl_rows(path) for path in ARCHIVE_TRAINING_DPO_DIR.glob("pairs_*.jsonl") + ), + "latest_dpo_file": latest_path(ARCHIVE_TRAINING_DPO_DIR, "pairs_*.jsonl").name + if latest_path(ARCHIVE_TRAINING_DPO_DIR, "pairs_*.jsonl") + else None, + "latest_note": latest_path(ARCHIVE_NOTES_DIR, "batch_*.md").name + if latest_path(ARCHIVE_NOTES_DIR, "batch_*.md") + else None, + "latest_eval": latest_path(ARCHIVE_TRAINING_EVALS_DIR, "run_*.json").name + if latest_path(ARCHIVE_TRAINING_EVALS_DIR, "run_*.json") + else None, + } + write_json(ARCHIVE_PROGRESS_FILE, snapshot) + return snapshot + + +def archive_batch_id(batch_number): + return f"batch_{batch_number:03d}" + + +def archive_profile_summary(profile): + claims = profile.get("claims", []) + durable = [claim for claim in claims if claim.get("status") == "durable"][:12] + provisional = [claim for claim in claims if claim.get("status") == "provisional"][:8] + return { + "durable_claims": durable, + "provisional_claims": provisional, + } + + +def format_tweets_for_prompt(rows): + formatted = [] + for index, row in enumerate(rows, start=1): + formatted.append( + f"{index}. tweet_id={row.get('tweet_id')} created_at={row.get('created_at')}\n" + f"text={row.get('full_text')}" + ) + return "\n\n".join(formatted) + + +def normalize_candidate_entry(candidate, batch_id, index): + category = str(candidate.get("category") or "recurring-theme").strip() + claim = str(candidate.get("claim") or "").strip() + if not claim: + return None + quotes = [] + for quote in candidate.get("evidence_quotes", [])[:5]: + quote = str(quote).strip() + if quote and quote not in quotes: + quotes.append(quote) + evidence_ids = [] + for tweet_id in candidate.get("evidence_tweet_ids", []): + tweet_id = str(tweet_id).strip() + if tweet_id and tweet_id not in evidence_ids: + evidence_ids.append(tweet_id) + try: + confidence = float(candidate.get("confidence", 0.5)) + except (TypeError, ValueError): + confidence = 0.5 + confidence = max(0.0, min(confidence, 1.0)) + status = str(candidate.get("status") or "provisional").strip().lower() + if status not in {"provisional", "durable", "retracted"}: + status = "provisional" + contradictions = [] + for item in candidate.get("contradicts", [])[:5]: + item = str(item).strip() + if item and item not in contradictions: + contradictions.append(item) + return { + "id": f"{batch_id}-candidate-{index:02d}", + "category": category, + "claim": claim, + "evidence_tweet_ids": evidence_ids, + "evidence_quotes": quotes, + "confidence": round(confidence, 3), + "status": status, + "first_seen_at": batch_id, + "last_confirmed_at": batch_id, + "contradicts": contradictions, + } + + +def normalize_training_examples(examples, batch_id, tweet_ids, fallback_prompt, fallback_response): + normalized = [] + for index, example in enumerate(examples, start=1): + prompt = str(example.get("prompt") or example.get("instruction") or "").strip() + response = str(example.get("response") or example.get("answer") or "").strip() + if not prompt or not response: + continue + normalized.append( + { + "example_id": f"{batch_id}-example-{index:02d}", + "batch_id": batch_id, + "task_type": str(example.get("task_type") or "analysis").strip() or "analysis", + "prompt": prompt, + "response": response, + "tweet_ids": tweet_ids, + } + ) + if normalized: + return normalized + return [ + { + "example_id": f"{batch_id}-example-01", + "batch_id": batch_id, + "task_type": "analysis", + "prompt": fallback_prompt, + "response": fallback_response, + "tweet_ids": tweet_ids, + } + ] + + +def normalize_rubric_scores(scores): + rubric = {} + for key in ("grounding", "specificity", "source_distinction", "actionability"): + try: + rubric[key] = float(scores.get(key, 0)) + except (TypeError, ValueError): + rubric[key] = 0.0 + return rubric + + +def build_archive_draft_prompt(batch_id, checkpoint, profile, prior_note, batch_rows): + tweet_ids = [row.get("tweet_id") for row in batch_rows] + previous_summary = archive_profile_summary(profile) + return ( + "You are Timmy, reading Alexander's private Twitter archive.\n" + "Work only from the supplied tweets. Do not invent facts. Separate explicit facts from inference.\n" + "Return ONLY valid JSON with this schema:\n" + '{' + '"notes_markdown":"...",' + '"knowledge_candidates":[{' + '"category":"trait|preference|project|relationship|value|recurring-theme",' + '"claim":"...",' + '"evidence_tweet_ids":["..."],' + '"evidence_quotes":["..."],' + '"confidence":0.0,' + '"status":"provisional",' + '"contradicts":["optional contradiction hint"]' + '}],' + '"training_examples":[{"prompt":"...","response":"...","task_type":"analysis"}],' + '"phase":"discovery|synthesis|refinement",' + '"confidence":"low|medium|high",' + '"next_focus":"..."' + '}\n\n' + f"Batch id: {batch_id}\n" + f"Checkpoint: {json.dumps(checkpoint, indent=2)}\n" + f"Previous profile summary: {json.dumps(previous_summary, indent=2)}\n" + f"Prior batch note excerpt: {prior_note[-2500:] if prior_note else 'none'}\n" + f"Tweet ids in this batch: {tweet_ids}\n\n" + "Tweets:\n" + f"{format_tweets_for_prompt(batch_rows)}\n" + ) + + +def build_archive_critique_prompt(batch_id, draft_payload, batch_rows): + rubric = { + "grounding": "Every material claim must be supported by quoted evidence and tweet ids.", + "specificity": "Avoid bland summaries; identify concrete traits, projects, values, and relationships.", + "source_distinction": "Mark inference carefully and never upgrade speculation into fact.", + "actionability": "Training examples should teach Timmy how to read Alexander usefully.", + } + return ( + "You are the critique pass for Timmy's private Twitter archive learning loop.\n" + "Rewrite the draft into a stronger, more grounded version.\n" + "Return ONLY valid JSON with this schema:\n" + '{' + '"notes_markdown":"...",' + '"knowledge_candidates":[{' + '"category":"trait|preference|project|relationship|value|recurring-theme",' + '"claim":"...",' + '"evidence_tweet_ids":["..."],' + '"evidence_quotes":["..."],' + '"confidence":0.0,' + '"status":"provisional",' + '"contradicts":["optional contradiction hint"]' + '}],' + '"training_examples":[{"prompt":"...","response":"...","task_type":"analysis"}],' + '"rubric_scores":{"grounding":0,"specificity":0,"source_distinction":0,"actionability":0},' + '"phase":"discovery|synthesis|refinement",' + '"confidence":"low|medium|high",' + '"next_focus":"..."' + '}\n\n' + f"Batch id: {batch_id}\n" + f"Rubric: {json.dumps(rubric, indent=2)}\n" + f"Draft payload: {json.dumps(draft_payload, indent=2)}\n" + "Tweets:\n" + f"{format_tweets_for_prompt(batch_rows)}\n" + ) + + +def build_weekly_insight_prompt(profile, recent_batches): + return ( + "You are Timmy preparing a private weekly insight brief about Alexander.\n" + "Use the profile plus recent batch deltas to produce grounded, actionable insights.\n" + "Return ONLY valid JSON with this schema:\n" + '{' + '"markdown_report":"...",' + '"opportunities":[{' + '"id":"...",' + '"theme":"...",' + '"insight":"...",' + '"why_it_matters":"...",' + '"evidence_tweet_ids":["..."],' + '"suggested_action":"...",' + '"confidence":0.0,' + '"time_horizon":"this week|this month|long-term"' + '}]' + '}\n\n' + f"Profile: {json.dumps(archive_profile_summary(profile), indent=2)}\n" + f"Recent batches: {json.dumps(recent_batches, indent=2)}\n" + ) + + +def latest_eval_gate(): + latest_eval = latest_path(ARCHIVE_TRAINING_EVALS_DIR, "run_*.json") + if not latest_eval: + return None + return run_timmy_home_module( + "scripts.twitter_archive.evaluate_candidate", + args=["--eval-file", str(latest_eval)], + timeout=60, + ) + + +def training_command_env(): + return { + "TIMMY_ARCHIVE_DIR": str(ARCHIVE_DIR), + "TIMMY_HOME": str(TIMMY_HOME), + } + + +def _archive_extract_impl(): + return run_timmy_home_module("scripts.twitter_archive.extract_archive") + + +@huey.task() +def archive_extract(): + """Deterministically extract tweets.js into the private JSONL workspace.""" + return _archive_extract_impl() + + +def _archive_profile_consolidate_impl(): + checkpoint = load_archive_checkpoint() + result = run_timmy_home_module("scripts.twitter_archive.consolidate_profile") + if result.get("status") == "ok": + checkpoint["last_profile_update"] = datetime.now(timezone.utc).isoformat() + write_json(ARCHIVE_CHECKPOINT, checkpoint) + return result + + +@huey.task() +def archive_profile_consolidate(): + """Merge batch candidate files into a deterministic archive profile.""" + return _archive_profile_consolidate_impl() + + +def _archive_dpo_build_impl(): + checkpoint = load_archive_checkpoint() + result = run_timmy_home_module("scripts.twitter_archive.build_dpo_pairs") + if result.get("status") == "ok": + checkpoint["last_dpo_build"] = datetime.now(timezone.utc).isoformat() + write_json(ARCHIVE_CHECKPOINT, checkpoint) + return result + + +@huey.task() +def archive_dpo_build(): + """Build local-only DPO pairs from completed archive batches.""" + return _archive_dpo_build_impl() + + +def _archive_pipeline_health_impl(): + result = run_timmy_home_module("scripts.twitter_archive.pipeline_health") + latest_session = latest_path(HERMES_HOME / "sessions", "session_*.json") + latest_dpo = latest_path(ARCHIVE_TRAINING_DPO_DIR, "pairs_*.jsonl") + if latest_session: + result["latest_session"] = latest_session.name + if latest_dpo: + result["latest_dpo_file"] = latest_dpo.name + if latest_session and latest_dpo and latest_session.stat().st_mtime > latest_dpo.stat().st_mtime: + issues = result.setdefault("issues", []) + issues.append("latest Hermes session is newer than latest archive DPO file") + result["ok"] = False + result["progress"] = archive_progress_snapshot() + return result + + +@huey.task() +def archive_pipeline_health(): + """Check the private archive pipeline for stalled or missing stages.""" + return _archive_pipeline_health_impl() + + +def _know_thy_father_impl(): + ensure_archive_layout() + extraction = _archive_extract_impl() + if extraction.get("status") != "ok": + return {"status": "error", "reason": "archive extraction failed", "extract": extraction} + + checkpoint = load_archive_checkpoint() + tweets = load_jsonl(ARCHIVE_TWEETS_FILE) + if not tweets: + return {"status": "error", "reason": "no extracted tweets found"} + + offset = int(checkpoint.get("next_offset", 0) or 0) + if offset >= len(tweets): + return { + "status": "complete", + "batches_completed": checkpoint.get("batches_completed", 0), + "tweet_count": len(tweets), + "progress": archive_progress_snapshot(), + } + + batch_rows = tweets[offset:offset + ARCHIVE_BATCH_SIZE] + batch_number = int(checkpoint.get("batches_completed", 0) or 0) + 1 + batch_id = archive_batch_id(batch_number) + batch_tweet_ids = [str(row.get("tweet_id")) for row in batch_rows] + profile = read_json(ARCHIVE_PROFILE_FILE, {"claims": []}) + previous_note = "" + previous_batch = checkpoint.get("last_batch_id") + if previous_batch: + previous_note_path = ARCHIVE_NOTES_DIR / f"{previous_batch}.md" + if previous_note_path.exists(): + previous_note = previous_note_path.read_text() + + draft_prompt = build_archive_draft_prompt( + batch_id=batch_id, + checkpoint=checkpoint, + profile=profile, + prior_note=previous_note, + batch_rows=batch_rows, + ) + draft_run = run_hermes_local( + prompt=draft_prompt, + caller_tag=f"know-thy-father-draft:{batch_id}", + ) + if not draft_run: + return {"status": "error", "reason": "draft pass failed"} + + write_text(ARCHIVE_TRAINING_RUNS_DIR / f"{batch_id}_draft.txt", draft_run["response"]) + try: + draft_payload = extract_first_json_object(draft_run["response"]) + except ValueError: + return {"status": "error", "reason": "draft pass did not return JSON", "batch_id": batch_id} + + critique_prompt = build_archive_critique_prompt(batch_id=batch_id, draft_payload=draft_payload, batch_rows=batch_rows) + critique_run = run_hermes_local( + prompt=critique_prompt, + caller_tag=f"know-thy-father-critique:{batch_id}", + ) + if not critique_run: + return {"status": "error", "reason": "critique pass failed", "batch_id": batch_id} + + write_text(ARCHIVE_TRAINING_RUNS_DIR / f"{batch_id}_critique.txt", critique_run["response"]) + try: + critique_payload = extract_first_json_object(critique_run["response"]) + except ValueError: + return {"status": "error", "reason": "critique pass did not return JSON", "batch_id": batch_id} + + notes_markdown = str(critique_payload.get("notes_markdown") or "").strip() + if not notes_markdown: + return {"status": "error", "reason": "critique output missing notes", "batch_id": batch_id} + + knowledge_candidates = [] + for index, candidate in enumerate(critique_payload.get("knowledge_candidates", []), start=1): + normalized = normalize_candidate_entry(candidate, batch_id, index) + if normalized: + knowledge_candidates.append(normalized) + + training_examples = normalize_training_examples( + critique_payload.get("training_examples", []), + batch_id=batch_id, + tweet_ids=batch_tweet_ids, + fallback_prompt="Read this batch of Alexander's tweets and write grounded notes with evidence.", + fallback_response=notes_markdown, + ) + + note_body = ( + f"# {batch_id}\n\n" + f"- Batch number: {batch_number}\n" + f"- Tweet range: {offset} to {offset + len(batch_rows) - 1}\n" + f"- Tweet ids: {', '.join(batch_tweet_ids)}\n\n" + f"{notes_markdown}\n" + ) + write_text(ARCHIVE_NOTES_DIR / f"{batch_id}.md", note_body) + write_jsonl(ARCHIVE_TRAINING_EXAMPLES_DIR / f"{batch_id}.jsonl", training_examples) + + batch_payload = { + "batch_id": batch_id, + "batch_number": batch_number, + "tweet_ids": batch_tweet_ids, + "prompt": draft_prompt, + "rejected": str(draft_payload.get("notes_markdown") or draft_run["response"]).strip(), + "chosen": notes_markdown, + "draft_session_id": draft_run.get("session_id"), + "critique_session_id": critique_run.get("session_id"), + "rubric_scores": normalize_rubric_scores(critique_payload.get("rubric_scores", {})), + "knowledge_candidates": knowledge_candidates, + "training_examples": training_examples, + "phase": str(critique_payload.get("phase") or checkpoint.get("phase") or "discovery"), + "confidence": str(critique_payload.get("confidence") or checkpoint.get("confidence") or "low"), + "next_focus": str(critique_payload.get("next_focus") or checkpoint.get("next_focus") or ""), + "draft_response_file": f"{batch_id}_draft.txt", + "critique_response_file": f"{batch_id}_critique.txt", + } + write_json(ARCHIVE_CANDIDATES_DIR / f"{batch_id}.json", batch_payload) + + checkpoint["next_offset"] = offset + len(batch_rows) + checkpoint["batches_completed"] = batch_number + checkpoint["phase"] = batch_payload["phase"] + checkpoint["confidence"] = batch_payload["confidence"] + checkpoint["next_focus"] = batch_payload["next_focus"] + checkpoint["understanding_version"] = batch_number + checkpoint["last_batch_id"] = batch_id + checkpoint["last_batch_sessions"] = { + "draft": draft_run.get("session_id"), + "critique": critique_run.get("session_id"), + } + write_json(ARCHIVE_CHECKPOINT, checkpoint) + + profile_result = _archive_profile_consolidate_impl() + dpo_result = _archive_dpo_build_impl() + health_result = _archive_pipeline_health_impl() + + return { + "status": "ok", + "batch_id": batch_id, + "batch_number": batch_number, + "tweets_processed": len(batch_rows), + "next_offset": checkpoint["next_offset"], + "knowledge_candidates": len(knowledge_candidates), + "training_examples": len(training_examples), + "profile": profile_result, + "dpo": dpo_result, + "health": health_result, + } @huey.task() @huey.lock_task("know-thy-father") def know_thy_father(): - """Process one batch of Alexander's Twitter archive. + """Process one explicit 50-tweet archive batch into private learning artifacts.""" + return _know_thy_father_impl() - Single batch, no internal loop. Huey schedules the cadence. - Lock prevents overlapping runs. Timmy reads his own prior notes, - processes the next chunk, updates his understanding, and checkpoints. - """ - is_first_run = not ARCHIVE_CHECKPOINT.exists() - prompt = ARCHIVE_FIRST_RUN_PROMPT if is_first_run else ARCHIVE_PROMPT +def _archive_weekly_insights_impl(): + ensure_archive_layout() + profile = read_json(ARCHIVE_PROFILE_FILE, {"claims": []}) + if not profile.get("claims"): + return {"status": "error", "reason": "profile is empty; run know_thy_father first"} - response = hermes_local( - prompt=prompt, - caller_tag="know-thy-father", - toolsets="file,terminal", - ) + recent_batches = [] + for path in sorted(ARCHIVE_CANDIDATES_DIR.glob("batch_*.json"))[-3:]: + batch = read_json(path, {}) + recent_batches.append( + { + "batch_id": batch.get("batch_id", path.stem), + "tweet_ids": batch.get("tweet_ids", [])[:10], + "next_focus": batch.get("next_focus"), + "knowledge_candidates": batch.get("knowledge_candidates", [])[:5], + } + ) - if not response: - return {"status": "error", "reason": "hermes_local returned None"} + prompt = build_weekly_insight_prompt(profile=profile, recent_batches=recent_batches) + insight_run = run_hermes_local(prompt=prompt, caller_tag="archive-weekly-insights") + if not insight_run: + return {"status": "error", "reason": "insight pass failed"} - # Read checkpoint to report progress try: - cp = json.loads(ARCHIVE_CHECKPOINT.read_text()) - except Exception: - cp = {} + insight_payload = extract_first_json_object(insight_run["response"]) + except ValueError: + return {"status": "error", "reason": "insight pass did not return JSON"} + date_key = datetime.now(timezone.utc).strftime("%Y%m%d") + weekly_file = ARCHIVE_INSIGHTS_DIR / f"weekly_{date_key}.md" + opportunities_file = ARCHIVE_INSIGHTS_DIR / "opportunities.json" + markdown_report = str(insight_payload.get("markdown_report") or "").strip() + opportunities = [] + for item in insight_payload.get("opportunities", []): + opportunity = { + "id": str(item.get("id") or f"opportunity-{len(opportunities) + 1}").strip(), + "theme": str(item.get("theme") or "").strip(), + "insight": str(item.get("insight") or "").strip(), + "why_it_matters": str(item.get("why_it_matters") or "").strip(), + "evidence_tweet_ids": [str(tweet_id) for tweet_id in item.get("evidence_tweet_ids", []) if str(tweet_id).strip()], + "suggested_action": str(item.get("suggested_action") or "").strip(), + "confidence": round(float(item.get("confidence", 0.0) or 0.0), 3), + "time_horizon": str(item.get("time_horizon") or "this week").strip(), + } + if opportunity["theme"] and opportunity["insight"] and opportunity["suggested_action"]: + opportunities.append(opportunity) + + write_text(weekly_file, markdown_report) + write_json(opportunities_file, {"generated_at": datetime.now(timezone.utc).isoformat(), "opportunities": opportunities}) + + checkpoint = load_archive_checkpoint() + checkpoint["last_insight_file"] = weekly_file.name + write_json(ARCHIVE_CHECKPOINT, checkpoint) + archive_progress_snapshot() return { "status": "ok", - "batch": cp.get("batches_completed", 0), - "phase": cp.get("phase", "unknown"), - "confidence": cp.get("confidence", "unknown"), + "weekly_file": weekly_file.name, + "opportunities": len(opportunities), + "session_id": insight_run.get("session_id"), + } + + +@huey.task() +def archive_weekly_insights(): + """Generate the private weekly insight brief from the current profile.""" + return _archive_weekly_insights_impl() + + +def _archive_train_adapter_impl(): + ensure_archive_layout() + counts = archive_counts() + state = load_train_state() + eval_gate = latest_eval_gate() + if state.get("awaiting_eval"): + if not eval_gate or not eval_gate.get("pass"): + return { + "status": "blocked", + "reason": "latest candidate eval is missing or still red", + "last_candidate_id": state.get("last_candidate_id"), + "eval": eval_gate, + } + + new_pairs = max(0, counts["total_pairs"] - int(state.get("last_total_pairs", 0) or 0)) + new_batches = max(0, counts["total_batches"] - int(state.get("last_total_batches", 0) or 0)) + if new_pairs < 200 and new_batches < 10: + return { + "status": "not-ready", + "new_pairs": new_pairs, + "new_batches": new_batches, + "threshold": {"pairs": 200, "batches": 10}, + } + + pipeline_config = load_pipeline_config() + train_command = str(pipeline_config.get("train_command") or "").strip() + timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S") + candidate_id = f"timmy-archive-{timestamp}" + run_log = ARCHIVE_TRAINING_RUNS_DIR / f"train_{timestamp}.log" + run_manifest = { + "status": "ready" if not train_command else "started", + "candidate_id": candidate_id, + "new_pairs": new_pairs, + "new_batches": new_batches, + "train_command": train_command or None, + "created_at": datetime.now(timezone.utc).isoformat(), + } + + if not train_command: + write_json(ARCHIVE_TRAINING_RUNS_DIR / f"train_{timestamp}.json", run_manifest) + return run_manifest + + env = os.environ.copy() + env.update(training_command_env()) + result = subprocess.run( + ["/bin/zsh", "-lc", train_command], + cwd=str(TIMMY_HOME), + capture_output=True, + text=True, + timeout=3600, + env=env, + ) + run_log.write_text((result.stdout or "") + ("\n" + result.stderr if result.stderr else "")) + run_manifest["exit_code"] = result.returncode + run_manifest["log_file"] = run_log.name + run_manifest["status"] = "ok" if result.returncode == 0 else "error" + write_json(ARCHIVE_TRAINING_RUNS_DIR / f"train_{timestamp}.json", run_manifest) + + if result.returncode == 0: + state.update( + { + "last_total_batches": counts["total_batches"], + "last_total_pairs": counts["total_pairs"], + "last_candidate_id": candidate_id, + "awaiting_eval": True, + "last_run_status": "ok", + "last_run_at": datetime.now(timezone.utc).isoformat(), + } + ) + write_json(ARCHIVE_TRAIN_STATE_FILE, state) + else: + state.update( + { + "last_run_status": "error", + "last_run_at": datetime.now(timezone.utc).isoformat(), + } + ) + write_json(ARCHIVE_TRAIN_STATE_FILE, state) + return run_manifest + + +@huey.task() +def archive_train_adapter(): + """Train an archive-reading adapter when DPO thresholds and eval gates allow.""" + return _archive_train_adapter_impl() + + +def _archive_promote_candidate_impl(): + eval_gate = latest_eval_gate() + if not eval_gate: + return {"status": "blocked", "reason": "missing eval file"} + if not eval_gate.get("pass"): + write_json( + ARCHIVE_PROMOTION_STATE_FILE, + { + "status": "blocked", + "reason": "promotion gate failed", + "evaluated_at": datetime.now(timezone.utc).isoformat(), + "eval": eval_gate, + }, + ) + return {"status": "blocked", "eval": eval_gate} + + pipeline_config = load_pipeline_config() + promote_command = str(pipeline_config.get("promote_command") or "").strip() + timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S") + decision = { + "status": "ready" if not promote_command else "started", + "candidate_id": eval_gate.get("candidate_id"), + "rollback_model": eval_gate.get("rollback_model"), + "evaluated_at": datetime.now(timezone.utc).isoformat(), + "eval": eval_gate, + } + + if promote_command: + env = os.environ.copy() + env.update(training_command_env()) + env["TIMMY_ARCHIVE_CANDIDATE_ID"] = str(eval_gate.get("candidate_id") or "") + result = subprocess.run( + ["/bin/zsh", "-lc", promote_command], + cwd=str(TIMMY_HOME), + capture_output=True, + text=True, + timeout=1200, + env=env, + ) + log_path = ARCHIVE_TRAINING_RUNS_DIR / f"promote_{timestamp}.log" + log_path.write_text((result.stdout or "") + ("\n" + result.stderr if result.stderr else "")) + decision["status"] = "ok" if result.returncode == 0 else "error" + decision["exit_code"] = result.returncode + decision["log_file"] = log_path.name + if result.returncode != 0: + write_json(ARCHIVE_PROMOTION_STATE_FILE, decision) + return decision + + write_json( + ARCHIVE_ACTIVE_MODEL_FILE, + { + "candidate_id": eval_gate.get("candidate_id"), + "rollback_model": eval_gate.get("rollback_model"), + "promoted_at": datetime.now(timezone.utc).isoformat(), + }, + ) + write_json(ARCHIVE_PROMOTION_STATE_FILE, decision) + state = load_train_state() + state["awaiting_eval"] = False + state["last_run_status"] = "promoted" + write_json(ARCHIVE_TRAIN_STATE_FILE, state) + return decision + + +@huey.task() +def archive_promote_candidate(): + """Promote an archive candidate model only when offline eval gates pass.""" + return _archive_promote_candidate_impl() + + +@huey.periodic_task(crontab(hour="*/4", minute="15")) +def archive_pipeline_tick(): + """Advance the private archive learning loop on a regular cadence.""" + batch = _know_thy_father_impl() + train = _archive_train_adapter_impl() + promote = _archive_promote_candidate_impl() + insight = {"status": "skipped"} + if datetime.now(timezone.utc).weekday() == 0: + expected = f"weekly_{datetime.now(timezone.utc).strftime('%Y%m%d')}.md" + if not (ARCHIVE_INSIGHTS_DIR / expected).exists(): + insight = _archive_weekly_insights_impl() + return { + "batch": batch, + "train": train, + "promote": promote, + "insight": insight, + "health": _archive_pipeline_health_impl(), } @@ -512,8 +1369,8 @@ def heartbeat_tick(): if not perception.get("gitea_alive"): actions.append("ALERT: Gitea unreachable") health = perception.get("model_health", {}) - if isinstance(health, dict) and not health.get("ollama_running"): - actions.append("ALERT: Ollama not running") + if isinstance(health, dict) and not health.get("local_inference_running"): + actions.append("ALERT: local inference surface not running") decision = { "actions": actions, "severity": "fallback", @@ -569,7 +1426,7 @@ def memory_compress(): # Compress: extract key facts alerts = [] gitea_down_count = 0 - ollama_down_count = 0 + inference_down_count = 0 for t in ticks: for action in t.get("actions", []): @@ -578,8 +1435,8 @@ def memory_compress(): if not p.get("gitea_alive"): gitea_down_count += 1 health = p.get("model_health", {}) - if isinstance(health, dict) and not health.get("ollama_running"): - ollama_down_count += 1 + if isinstance(health, dict) and not health.get("local_inference_running"): + inference_down_count += 1 # Last tick's perception = current state last = ticks[-1].get("perception", {}) @@ -589,7 +1446,7 @@ def memory_compress(): "total_ticks": len(ticks), "alerts": alerts[-10:], # last 10 alerts "gitea_downtime_ticks": gitea_down_count, - "ollama_downtime_ticks": ollama_down_count, + "local_inference_downtime_ticks": inference_down_count, "last_known_state": last, } @@ -623,7 +1480,7 @@ def good_morning_report(): tick_count = 0 alerts = [] gitea_up = True - ollama_up = True + local_inference_up = True if tick_log.exists(): for line in tick_log.read_text().strip().split("\n"): @@ -636,8 +1493,8 @@ def good_morning_report(): if not p.get("gitea_alive"): gitea_up = False h = p.get("model_health", {}) - if isinstance(h, dict) and not h.get("ollama_running"): - ollama_up = False + if isinstance(h, dict) and not h.get("local_inference_running"): + local_inference_up = False except Exception: continue @@ -697,7 +1554,11 @@ def good_morning_report(): if briefing_file.exists(): try: b = json.loads(briefing_file.read_text()) - briefing_summary = f"Yesterday: {b.get('total_ticks', 0)} heartbeat ticks, {b.get('gitea_downtime_ticks', 0)} Gitea downticks, {b.get('ollama_downtime_ticks', 0)} Ollama downticks." + briefing_summary = ( + f"Yesterday: {b.get('total_ticks', 0)} heartbeat ticks, " + f"{b.get('gitea_downtime_ticks', 0)} Gitea downticks, " + f"{b.get('local_inference_downtime_ticks', 0)} local inference downticks." + ) except Exception: pass @@ -709,7 +1570,7 @@ def good_morning_report(): **Heartbeat:** {tick_count} ticks logged overnight. **Gitea:** {"up all night" if gitea_up else "⚠️ had downtime"} -**Ollama:** {"running steady" if ollama_up else "⚠️ had downtime"} +**Local inference:** {"running steady" if local_inference_up else "⚠️ had downtime"} **Model status:** {model_status} **Models on disk:** {len(models_loaded)} ({', '.join(m for m in models_loaded if 'timmy' in m.lower() or 'hermes' in m.lower()) or 'none with our name'}) **Alerts:** {len(alerts)} {'— ' + '; '.join(alerts[-3:]) if alerts else '(clean night)'}