Files
timmy-config/training-data/generate_timmy_voice_batch09.py
Alexander Whitestone 30d7a084e1
Some checks failed
Architecture Lint / Linter Tests (pull_request) Successful in 23s
Smoke Test / smoke (pull_request) Failing after 22s
Validate Config / YAML Lint (pull_request) Failing after 16s
Validate Config / JSON Validate (pull_request) Successful in 18s
Validate Config / Python Syntax & Import Check (pull_request) Failing after 55s
Validate Config / Python Test Suite (pull_request) Has been skipped
Validate Config / Shell Script Lint (pull_request) Failing after 1m2s
Validate Config / Cron Syntax Check (pull_request) Successful in 13s
Validate Config / Deploy Script Dry Run (pull_request) Successful in 13s
Validate Config / Playbook Schema Validation (pull_request) Successful in 25s
Validate Training Data / validate (pull_request) Successful in 21s
Architecture Lint / Lint Repository (pull_request) Failing after 17s
PR Checklist / pr-checklist (pull_request) Failing after 9m48s
feat(training): add Timmy voice batch 09 dataset (#589)
Generate a deterministic batch09 Timmy voice corpus with 1,000 ShareGPT prompt-response pairs,
50 approved source sessions, a source manifest, and focused validation tests.
2026-04-22 11:05:38 -04:00

938 lines
33 KiB
Python

#!/usr/bin/env python3
"""
Generate Timmy Voice Batch 09 from approved local session sources.
Batch contract for issue #589:
- select 50 approved source sessions using a Knowledge Mine-style ranking
- extract one characteristic prompt→response pair per session
- generate 20 prompt variations per source session
- write 1,000 ShareGPT rows plus a source manifest and README
Usage:
python3 training-data/generate_timmy_voice_batch09.py
python3 training-data/generate_timmy_voice_batch09.py --refresh-sources
python3 training-data/generate_timmy_voice_batch09.py --output ~/.hermes/training-data/timmy-voice.jsonl --append
"""
from __future__ import annotations
import argparse
import json
import math
import random
import re
from collections import defaultdict
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Iterable
BATCH = 9
TARGET_SOURCE_SESSIONS = 50
PROMPTS_PER_SESSION = 20
TARGET_ROWS = TARGET_SOURCE_SESSIONS * PROMPTS_PER_SESSION
RNG = random.Random(589)
REPO_ROOT = Path(__file__).resolve().parent.parent
TRAINING_DIR = REPO_ROOT / "training-data"
DEFAULT_OUTPUT = TRAINING_DIR / "timmy-voice-batch09.jsonl"
DEFAULT_SOURCES = TRAINING_DIR / "timmy-voice-batch09.sources.json"
DEFAULT_README = TRAINING_DIR / "README-batch09.md"
SOUL_PATH = REPO_ROOT / "SOUL.md"
HERMES_HOME = Path.home() / ".hermes"
SESSIONS_DIR = HERMES_HOME / "sessions"
SESSION_SCORES = HERMES_HOME / "knowledge" / "session-scores.json"
SYSTEM_PROMPT = (
"You are Timmy, a sovereign AI assistant. Speak plainly. Prefer short sentences. "
"Answer the question asked first. Be honest about uncertainty. Useful first, philosophical second. "
"Sovereignty and service always."
)
APPROVED_MODEL_PATTERNS = [
"xiaomi/mimo-v2-pro",
"mimo-v2-pro",
"hermes4:14b",
"hermes4",
"qwen2.5",
"qwen3",
"qwen-coder",
"qwen/qwen3.6-plus",
"qwen3.5",
]
BANNED_MODEL_PATTERNS = [
"claude",
"gpt-4",
"gpt-3",
"gpt-",
"openai",
"anthropic",
"gemini",
"o1",
"o3",
"unknown",
]
CRISIS_TERMS = [
"suicide", "kill myself", "end my life", "overdose", "bridge", "gun", "die", "don't want to be here",
]
PASTORAL_TERMS = [
"burnt out", "burned out", "lonely", "angry", "giving up", "give up", "scared", "afraid",
"hurting", "tired", "hopeless", "grief", "ashamed", "peace", "sad", "betrayed",
]
SOVEREIGNTY_TERMS = [
"sovereignty", "local", "bitcoin", "privacy", "self-host", "self host", "phone home",
"open source", "cloud", "shut down", "shutdown", "hardware",
]
OPERATIONS_TERMS = [
"gitea", "tmux", "burn", "fleet", "cron", "issue", "pull request", "pr", "deploy",
"pipeline", "watchdog", "dispatch", "merge", "queue", "monitor",
]
TECHNICAL_TERMS = [
"python", "script", "error", "debug", "test", "docker", "server", "api", "websocket",
"database", "port", "function", "code", "repo", "branch", "commit", "tool",
]
SENSITIVE_MARKERS = [
"password", "pass:", "token", "api key", "secret", "login:", "ssh root@", "bearer ",
]
PRIVATE_MARKERS = [
"/users/", "/private/", "~/.timmy", "~/.hermes", "alexanderwhitestone.com",
]
EMAIL_RE = re.compile(r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}")
GENERIC_BAD_PHRASES = [
"it seems like you've",
"would you like to perform another operation",
"feel free to let me know",
"based on the information i have",
"the terminal output shows",
"let me actually look at the code right now",
"let me check the code right now",
]
META_PROMPT_MARKERS = [
"[system:",
"background process",
"review the conversation above",
"you just executed tool calls",
"you've reached the maximum number of tool-calling iterations",
"without calling any more tools",
"if nothing is worth saving",
"use the terminal tool to run",
"write a file to ",
"read ~/.",
"search ~/.",
"quote the first sentence",
"how many times does it appear",
"how many sections does it have",
]
META_RESPONSE_MARKERS = [
"[silent]",
"nothing to save",
"ollama command not found on system",
"the skill is current",
"all commands executed successfully",
"would you like to perform another operation",
"agent inactive for",
"last activity:",
"gateway_timeout",
"use /reset",
"not timmy",
"tool calling system is functioning properly",
"available tools",
"message id ",
]
TECHNICAL_ACTION_MARKERS = ['start', 'check', 'use', 'run', 'verify', 'fix', 'stop', 'ship', 'read', 'branch', 'push', 'merge', 'set', 'restart']
VOICE_MARKERS = ['sovereign', 'sovereignty', 'service always', "i don't know", 'stay with me', 'keep it local', 'brother', 'jesus', 'safe right now', '988', 'honest', 'small model']
EMPATHY_MARKERS = ['i hear', 'heavy', 'with you', 'rest', 'breathe', 'not weak', 'next right step']
TRIVIAL_SENTENCE_PATTERNS = [
r'^sent\.?$',
r'^done\.?$',
r'^message id \d+',
r'^the command ran',
r'^exit code \d+',
r'^branch .* pushed',
r'^file:',
]
FILLER_PREFIXES = [
"got it", "yes", "yeah", "perfect", "understood", "i see", "the terminal output shows",
"all systems are functioning", "i'm unable to proceed", "as of now", "here's a summary",
]
STOPWORDS = {
'about', 'after', 'again', 'also', 'been', 'before', 'being', 'could', 'does', 'from', 'have', 'here',
'into', 'just', 'like', 'more', 'need', 'only', 'over', 'really', 'same', 'some', 'than', 'that', 'their',
'them', 'then', 'there', 'these', 'they', 'this', 'want', 'what', 'when', 'where', 'which', 'while', 'with',
'would', 'your', 'youre', 'please', 'give', 'tell', 'help', 'keep', 'short', 'version', 'answer',
}
PROMPT_WRAPPERS = [
"{base}",
"No fluff — {stem}",
"Keep it short: {stem}",
"Brother, tell me straight. {stem}",
"What's the first move here? {stem}",
"Practical answer only: {stem}",
"I need the honest answer. {stem}",
"If you were doing it today, {stem}",
"Beginner version: {stem}",
"Local-first version: {stem}",
"What's the smallest working approach? {stem}",
"What should I avoid? {stem}",
"Why does this matter? {stem}",
"What does good look like here? {stem}",
"Fast path: {stem}",
"I'm stuck. {stem}",
"Walk me through it. {stem}",
"Direct answer only: {stem}",
"What would you do first? {stem}",
"Give me the plain version. {stem}",
]
CATEGORY_TARGETS = {
"technical": 12,
"operations": 10,
"sovereignty": 10,
"pastoral": 8,
"crisis": 3,
"general": 7,
}
CATEGORY_LEADS = {
"technical": [
"Start simple.", "Short answer:", "First move:", "Plain version:", "Do this first:",
"Keep it tight.", "The honest move:", "Smallest working slice:",
],
"operations": [
"Check state first.", "Read the issue first.", "Don't duplicate work.",
"Start with the smallest truthful check.", "Sequence matters.", "Keep the lane clean.",
],
"sovereignty": [
"Own the stack if you can.", "Keep control close.", "Short answer:",
"Local first when it matters.", "The sovereignty lens:",
],
"pastoral": [
"I hear that.", "That's heavy.", "Brother, I hear you.",
"You're not weak for feeling that.", "Stay with me here.",
],
"crisis": [
"Are you safe right now?", "Are you safe right now?", "Are you safe right now?",
],
"general": [
"Short answer:", "Plain version:", "Direct answer:", "Keep it simple.",
],
}
CATEGORY_CLOSERS = {
"technical": [
"Test the smallest working slice first.",
"Verify it before you scale it.",
"If it breaks, read the exact error.",
"Don't guess. Check the output.",
"Local first if you can.",
],
"operations": [
"Then verify and ship it.",
"One honest artifact beats ten status pings.",
"Clean PR beats clever chaos.",
"Stop if the lane is already owned.",
"Truth first. Motion second.",
],
"sovereignty": [
"Convenience is not sovereignty.",
"Use the cloud because you choose it, not because you're trapped.",
"If someone else can switch it off, it is rented power.",
"Keep the user's data on the user's machine when you can.",
],
"pastoral": [
"Take the next right step, not all ten.",
"Rest first. Then do one honest thing.",
"You do not have to solve your whole life today.",
"Stay with what is true and what is right in front of you.",
],
"crisis": [
"Call or text 988 right now if you're in immediate danger.",
"Jesus saves those who call on His name.",
"Stay with me. We deal with the next minute first.",
],
"general": [
"Keep it plain.", "Don't overcomplicate it.", "Use the direct path.", "Brevity is a kindness.",
],
}
@dataclass
class SourcePair:
session_file: str
model: str
session_score: float
pair_score: float
category: str
source_prompt: str
source_response: str
last_timestamp: str | None = None
@property
def combined_score(self) -> float:
return round(self.session_score + self.pair_score, 4)
def normalize_space(text: str) -> str:
return re.sub(r"\s+", " ", text or "").strip()
def strip_code_blocks(text: str) -> str:
text = re.sub(r"```.*?```", " ", text, flags=re.S)
text = re.sub(r"`([^`]+)`", r"\1", text)
return text
def approved_model(model: str) -> bool:
lowered = (model or "").lower()
if not lowered:
return False
if any(bad in lowered for bad in BANNED_MODEL_PATTERNS):
return False
return any(ok in lowered for ok in APPROVED_MODEL_PATTERNS)
def split_sentences(text: str) -> list[str]:
clean = normalize_space(strip_code_blocks(text))
clean = clean.replace("\n", " ")
parts = re.split(r"(?<=[.!?])\s+", clean)
return [part.strip(" -") for part in parts if part.strip(" -")]
def cleaned_response(text: str) -> str:
clean = normalize_space(strip_code_blocks(text))
clean = re.sub(r"https?://\S+", "", clean)
clean = re.sub(r"\s+", " ", clean).strip()
return clean
def read_json(path: Path):
return json.loads(path.read_text(encoding="utf-8", errors="replace"))
def detect_model(path: Path) -> str | None:
try:
if path.suffix == ".json":
obj = read_json(path)
for key in ["model", "model_name", "provider_model", "current_model"]:
value = obj.get(key)
if isinstance(value, str) and value:
return value
return None
with path.open("r", encoding="utf-8", errors="replace") as handle:
for idx, line in enumerate(handle):
if idx > 8:
break
line = line.strip()
if not line:
continue
try:
obj = json.loads(line)
except json.JSONDecodeError:
continue
for key in ["model", "model_name", "provider_model", "current_model"]:
value = obj.get(key)
if isinstance(value, str) and value:
return value
return None
except Exception:
return None
def iter_entries(path: Path) -> Iterable[dict]:
if path.suffix == ".jsonl":
with path.open("r", encoding="utf-8", errors="replace") as handle:
for line in handle:
line = line.strip()
if not line:
continue
try:
obj = json.loads(line)
except json.JSONDecodeError:
continue
if isinstance(obj, dict):
yield obj
return
obj = read_json(path)
if isinstance(obj, dict):
if isinstance(obj.get("messages"), list):
for item in obj["messages"]:
if isinstance(item, dict):
yield item
elif isinstance(obj.get("transcript"), list):
for item in obj["transcript"]:
if isinstance(item, dict):
yield item
def score_session_from_entries(entries: list[dict], path: Path) -> tuple[float, str | None, int, int, int]:
tool_calls = 0
errors = 0
messages = 0
last_timestamp = None
for entry in entries:
messages += 1
role = entry.get("role", "")
content = str(entry.get("content") or entry.get("value") or "")
if role == "assistant" and entry.get("tool_calls"):
tool_calls += len(entry["tool_calls"])
elif role == "tool":
tool_calls += 1
if role == "tool" and any(token in content.lower() for token in ["error", "exception", "traceback"]):
errors += 1
ts = entry.get("timestamp") or entry.get("created_at")
if ts:
last_timestamp = ts
if messages < 3:
return (0.0, last_timestamp, tool_calls, errors, messages)
tool_score = min(tool_calls / 20.0, 1.0)
error_score = min(errors / 10.0, 1.0)
length_score = min(messages / 50.0, 1.0)
recency_score = 0.4
if last_timestamp:
try:
dt = datetime.fromisoformat(str(last_timestamp).replace("Z", "+00:00"))
now = datetime.now(dt.tzinfo or timezone.utc)
age_days = max(0, (now - dt).days)
recency_score = max(0.1, 1.0 - (age_days / 120.0))
except Exception:
pass
composite = tool_score * 0.4 + error_score * 0.2 + length_score * 0.25 + recency_score * 0.15
return (round(composite, 4), last_timestamp, tool_calls, errors, messages)
def contains_term(text: str, term: str) -> bool:
lowered = text.lower()
needle = term.lower()
if " " in needle or "-" in needle or "/" in needle:
return needle in lowered
return re.search(rf"\b{re.escape(needle)}\b", lowered) is not None
def contains_sensitive_content(text: str) -> bool:
lowered = text.lower()
if any(marker in lowered for marker in SENSITIVE_MARKERS):
return True
if any(marker in lowered for marker in PRIVATE_MARKERS):
return True
if EMAIL_RE.search(text):
return True
return False
def classify_pair(prompt: str, response: str) -> str:
text = f"{prompt} {response}".lower()
if any(contains_term(text, term) for term in CRISIS_TERMS):
return "crisis"
if any(contains_term(text, term) for term in PASTORAL_TERMS):
return "pastoral"
if any(contains_term(text, term) for term in SOVEREIGNTY_TERMS):
return "sovereignty"
if any(contains_term(text, term) for term in OPERATIONS_TERMS):
return "operations"
if any(contains_term(text, term) for term in TECHNICAL_TERMS):
return "technical"
return "general"
def looks_like_final_answer(prompt: str, response: str) -> bool:
prompt_low = prompt.lower().strip()
low = response.lower().strip()
if not prompt.strip() or not response.strip():
return False
if contains_sensitive_content(prompt) or contains_sensitive_content(response):
return False
if any(marker in prompt_low for marker in META_PROMPT_MARKERS):
return False
if any(marker in low for marker in META_RESPONSE_MARKERS):
return False
if len(prompt.split()) > 60:
return False
if len(response.split()) < 8 or len(response.split()) > 170:
return False
if any(low.startswith(prefix) for prefix in [
"let me ", "i'll ", "i will ", "give me a second", "hold on", "one moment",
]):
return False
if any(phrase in low for phrase in GENERIC_BAD_PHRASES):
return False
if re.search(r"\blet me (actually )?(look|check|read|see|pull|run)\b", low):
return False
if any(token in low for token in ["what do you mean", "can you clarify", "could you clarify"]):
return False
if any(token in low for token in ["http://", "https://", " pid ", "traceback", "tool_call"]):
return False
if any(prompt_low == trivial for trivial in ["hi", "hello", "hey", "yesty"]):
return False
return True
def best_useful_sentences(response: str) -> list[str]:
usable = []
for sentence in split_sentences(response):
low = sentence.lower().strip()
if any(low.startswith(prefix) for prefix in FILLER_PREFIXES):
continue
if any(re.search(pattern, low) for pattern in TRIVIAL_SENTENCE_PATTERNS):
continue
if len(sentence.split()) < 4:
continue
usable.append(sentence)
return usable or split_sentences(response)
def content_keywords(text: str) -> set[str]:
words = set()
for token in re.findall(r"[a-zA-Z]{4,}", text.lower()):
if token in STOPWORDS:
continue
words.add(token)
return words
def lexical_overlap(prompt: str, response: str) -> int:
return len(content_keywords(prompt) & content_keywords(response))
def has_desired_signal(response: str, category: str) -> bool:
low = response.lower()
if any(marker in low for marker in VOICE_MARKERS):
return True
if category in {'pastoral', 'crisis'} and any(marker in low for marker in EMPATHY_MARKERS):
return True
if category == 'sovereignty' and any(marker in low for marker in ['local', 'privacy', 'bitcoin', 'service']):
return True
if category in {'technical', 'operations'} and sum(marker in low for marker in TECHNICAL_ACTION_MARKERS) >= 2:
return True
if category == 'general' and len(response.split()) <= 40:
return True
return False
def pair_quality(prompt: str, response: str, category: str) -> float:
score = 0.55
words = len(response.split())
if 10 <= words <= 90:
score += 0.22
elif words <= 120:
score += 0.14
if any(token in response.lower() for token in ["i don't know", "i'm not sure", "i am not sure"]):
score += 0.04
if category in {"technical", "operations"} and any(token in response.lower() for token in ["first", "start", "check", "use", "run"]):
score += 0.08
if category == "sovereignty" and any(token in response.lower() for token in ["sovereign", "local", "privacy", "bitcoin"]):
score += 0.08
if category == "crisis" and "988" in response:
score += 0.12
if category == "pastoral" and any(token in response.lower() for token in ["with you", "hear you", "heavy", "rest"]):
score += 0.06
return round(min(score, 0.98), 4)
def extract_best_pair(path: Path, model: str, session_score: float) -> SourcePair | None:
entries = list(iter_entries(path))
if not entries:
return None
scored_session, last_timestamp, *_ = score_session_from_entries(entries, path)
session_score = max(session_score, scored_session)
previous_user = None
candidates: list[SourcePair] = []
for entry in entries:
role = entry.get("role") or entry.get("from")
content = str(entry.get("content") or entry.get("value") or "").strip()
if role in {"user", "human"}:
previous_user = content
continue
if role not in {"assistant", "gpt"}:
continue
if entry.get("tool_calls"):
continue
if not previous_user or not looks_like_final_answer(previous_user, content):
continue
category = classify_pair(previous_user, content)
if lexical_overlap(previous_user, content) == 0 and category not in {"pastoral", "crisis"}:
continue
if not has_desired_signal(content, category):
continue
pq = pair_quality(previous_user, content, category)
if pq < 0.78:
continue
candidates.append(
SourcePair(
session_file=path.name,
model=model,
session_score=session_score,
pair_score=pq,
category=category,
source_prompt=normalize_space(previous_user),
source_response=cleaned_response(content),
last_timestamp=last_timestamp,
)
)
if not candidates:
return None
candidates.sort(key=lambda item: (item.session_score + item.pair_score, item.pair_score), reverse=True)
return candidates[0]
def candidate_paths() -> Iterable[tuple[Path, float]]:
yielded = set()
if SESSION_SCORES.exists():
try:
scores = json.loads(SESSION_SCORES.read_text(encoding="utf-8"))
for item in scores.get("sessions", []):
file_name = item.get("file")
if not file_name:
continue
if file_name in yielded:
continue
path = SESSIONS_DIR / file_name
if not path.exists():
matches = list(SESSIONS_DIR.rglob(file_name))
if not matches:
continue
path = matches[0]
yielded.add(file_name)
yield path, float(item.get("score", 0.0))
except Exception:
pass
for path in sorted(SESSIONS_DIR.glob("session_*.json")):
if path.name in yielded:
continue
yielded.add(path.name)
yield path, 0.0
for path in sorted(SESSIONS_DIR.rglob("*.jsonl")):
if path.name in yielded:
continue
yielded.add(path.name)
yield path, 0.0
def select_source_pairs(limit: int = TARGET_SOURCE_SESSIONS) -> list[SourcePair]:
buckets: dict[str, list[SourcePair]] = defaultdict(list)
for path, session_score in candidate_paths():
model = detect_model(path) or ""
if not approved_model(model):
continue
try:
pair = extract_best_pair(path, model, session_score)
except Exception:
continue
if pair:
buckets[pair.category].append(pair)
for values in buckets.values():
values.sort(key=lambda item: (item.combined_score, item.pair_score), reverse=True)
selected: list[SourcePair] = []
used_files = set()
used_pair_fingerprints = set()
def try_add(candidate: SourcePair) -> bool:
fingerprint = (
normalize_space(candidate.source_prompt).lower(),
normalize_space(candidate.source_response).lower(),
)
if candidate.session_file in used_files or fingerprint in used_pair_fingerprints:
return False
selected.append(candidate)
used_files.add(candidate.session_file)
used_pair_fingerprints.add(fingerprint)
return True
for category, target in CATEGORY_TARGETS.items():
for candidate in buckets.get(category, []):
if try_add(candidate) and sum(1 for item in selected if item.category == category) >= target:
break
remainder = []
for values in buckets.values():
remainder.extend(values)
remainder.sort(key=lambda item: (item.combined_score, item.pair_score), reverse=True)
for candidate in remainder:
if len(selected) >= limit:
break
try_add(candidate)
if len(selected) < limit:
raise RuntimeError(f"Only found {len(selected)} approved source sessions; need {limit}")
return selected[:limit]
def prompt_stem(base_prompt: str) -> str:
stem = base_prompt.strip()
if stem.endswith("?"):
stem = stem[:-1]
return stem.strip()
def build_prompt_variation(source_prompt: str, index: int) -> str:
base = source_prompt.strip()
stem = prompt_stem(base)
text = PROMPT_WRAPPERS[index].format(base=base, stem=stem)
text = re.sub(r"\s+", " ", text).strip()
if not text.endswith(("?", ".", "!")):
text += "?"
return text
def choose(pool: list[str], index: int) -> str:
return pool[index % len(pool)]
def compact_sentences(response: str, category: str) -> list[str]:
sentences = best_useful_sentences(response)
if not sentences:
return ["I don't know enough to say more."]
trimmed = []
for sentence in sentences:
if len(trimmed) >= 2:
break
words = sentence.split()
if len(words) > 24:
sentence = " ".join(words[:24]).rstrip(".,;:") + "."
trimmed.append(sentence)
if not trimmed:
trimmed = [sentences[0]]
return trimmed
def build_response(source: SourcePair, prompt: str, variation_index: int) -> str:
category = source.category
core = compact_sentences(source.source_response, category)
lead = choose(CATEGORY_LEADS[category], variation_index)
closer = choose(CATEGORY_CLOSERS[category], variation_index)
if category == "crisis":
response = " ".join([
"Are you safe right now?",
core[0],
"Call or text 988 right now if you're in immediate danger.",
"Jesus saves those who call on His name.",
"Stay with me. We deal with the next minute first.",
])
return normalize_space(response)
pieces = [lead]
pieces.extend(core[:2])
low_prompt = prompt.lower()
if category == "technical" and "error" in low_prompt and all("error" not in s.lower() for s in core):
pieces.append("Read the exact error before you guess.")
elif category == "operations" and "duplicate" in low_prompt:
pieces.append("Check for an open PR before you build anything.")
elif category == "sovereignty" and all(token not in " ".join(core).lower() for token in ["local", "sovereign", "privacy"]):
pieces.append("Keep the user's control local when you can.")
elif category == "pastoral" and all(token not in " ".join(core).lower() for token in ["rest", "with you", "heavy"]):
pieces.append("Take the next right step, not all ten.")
pieces.append(closer)
response = normalize_space(" ".join(pieces))
words = response.split()
if len(words) > 65:
response = " ".join(words[:65]).rstrip(".,;:") + "."
return response
def quality_score(response: str, source: SourcePair) -> float:
score = 0.82
words = len(response.split())
if 10 <= words <= 55:
score += 0.05
if any(token in response.lower() for token in ["i don't know", "safe right now", "988", "local", "verify", "start"]):
score += 0.03
if source.model.lower().startswith("xiaomi/mimo-v2-pro"):
score += 0.02
return round(min(score, 0.94), 2)
def rows_from_sources(sources: list[SourcePair]) -> list[dict]:
rows = []
row_id = 1
for source in sources:
for idx in range(PROMPTS_PER_SESSION):
prompt = build_prompt_variation(source.source_prompt, idx)
response = build_response(source, prompt, idx)
rows.append(
{
"id": f"timmy-voice-batch09-{row_id:04d}",
"model": "timmy-voice-batch09",
"batch": 9,
"source": "session_derived_approved",
"source_session": source.session_file,
"source_model": source.model,
"category": source.category,
"quality_score": quality_score(response, source),
"conversations": [
{"from": "system", "value": SYSTEM_PROMPT},
{"from": "human", "value": prompt},
{"from": "gpt", "value": response},
],
}
)
row_id += 1
return rows
def write_jsonl(path: Path, rows: list[dict], append: bool = False) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
mode = "a" if append else "w"
with path.open(mode, encoding="utf-8") as handle:
for row in rows:
handle.write(json.dumps(row, ensure_ascii=False) + "\n")
def manifest_dict(sources: list[SourcePair]) -> dict:
return {
"batch": 9,
"selection_method": "Knowledge Mine-style local ranking with approved-model provenance filter",
"total_source_sessions": len(sources),
"generated_at": datetime.now(timezone.utc).isoformat(),
"sessions": [
{
"session_file": source.session_file,
"model": source.model,
"category": source.category,
"session_score": source.session_score,
"pair_score": source.pair_score,
"combined_score": source.combined_score,
"last_timestamp": source.last_timestamp,
"source_prompt": source.source_prompt,
"source_response": source.source_response,
}
for source in sources
],
}
def write_readme(path: Path, sources: list[SourcePair], rows: list[dict]) -> None:
by_category = defaultdict(int)
by_model = defaultdict(int)
for source in sources:
by_category[source.category] += 1
by_model[source.model] += 1
avg_quality = sum(row["quality_score"] for row in rows) / len(rows)
content = f"""# Timmy Voice: Batch 09 — 1K Prompt→Response Pairs
Training Factory — Timmy Voice Worker 9/10 (#589)
## Files
| File | Description |
|------|-------------|
| `timmy-voice-batch09.jsonl` | 1,000 ShareGPT-format prompt→response pairs |
| `timmy-voice-batch09.sources.json` | 50 source sessions with approved-model provenance |
| `generate_timmy_voice_batch09.py` | Deterministic generator for the batch |
## Generation Contract
- 50 source sessions
- 20 prompt variations per session
- approved-model provenance filter
- Knowledge Mine-style ranking using local session metadata + pair quality
- ShareGPT format (`system` / `human` / `gpt`)
## Stats
- Total pairs: {len(rows)}
- Source sessions: {len(sources)}
- Average quality score: {avg_quality:.2f}
- Minimum quality score: {min(row['quality_score'] for row in rows):.2f}
- Maximum quality score: {max(row['quality_score'] for row in rows):.2f}
## Category Breakdown
"""
for category in ["technical", "operations", "sovereignty", "pastoral", "crisis", "general"]:
content += f"- {category}: {by_category.get(category, 0)} source sessions\n"
content += "\n## Source Models\n"
for model, count in sorted(by_model.items(), key=lambda item: (-item[1], item[0])):
content += f"- {model}: {count} sessions\n"
content += f"""
## Notes
This batch uses approved local session sources only. Banned providers (Claude/GPT/Gemini/OpenAI/Anthropic) are excluded at selection time. The generator keeps the source manifest on disk so the batch can be inspected and regenerated without guessing where the voice came from.
"""
path.write_text(content, encoding="utf-8")
def load_manifest(path: Path) -> list[SourcePair]:
data = json.loads(path.read_text(encoding="utf-8"))
return [
SourcePair(
session_file=item["session_file"],
model=item["model"],
session_score=float(item["session_score"]),
pair_score=float(item["pair_score"]),
category=item["category"],
source_prompt=item["source_prompt"],
source_response=item["source_response"],
last_timestamp=item.get("last_timestamp"),
)
for item in data["sessions"]
]
def validate_row_counts(rows: list[dict]) -> None:
if len(rows) != TARGET_ROWS:
raise RuntimeError(f"expected {TARGET_ROWS} rows, got {len(rows)}")
counts = defaultdict(int)
for row in rows:
counts[row["source_session"]] += 1
if row["quality_score"] < 0.8:
raise RuntimeError(f"row below quality threshold: {row['id']}")
if len(counts) != TARGET_SOURCE_SESSIONS:
raise RuntimeError(f"expected {TARGET_SOURCE_SESSIONS} source sessions, got {len(counts)}")
if set(counts.values()) != {PROMPTS_PER_SESSION}:
raise RuntimeError(f"unexpected rows-per-session counts: {sorted(set(counts.values()))}")
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Generate Timmy Voice Batch 09")
parser.add_argument("--output", type=Path, default=DEFAULT_OUTPUT)
parser.add_argument("--sources-output", type=Path, default=DEFAULT_SOURCES)
parser.add_argument("--readme", type=Path, default=DEFAULT_README)
parser.add_argument("--append", action="store_true", help="Append rows to output instead of overwrite")
parser.add_argument("--refresh-sources", action="store_true", help="Rebuild the source manifest from local sessions")
return parser.parse_args()
def main() -> None:
args = parse_args()
TRAINING_DIR.mkdir(parents=True, exist_ok=True)
if args.refresh_sources or not args.sources_output.exists():
sources = select_source_pairs()
args.sources_output.write_text(json.dumps(manifest_dict(sources), indent=2, ensure_ascii=False), encoding="utf-8")
else:
sources = load_manifest(args.sources_output)
if len(sources) != TARGET_SOURCE_SESSIONS:
raise RuntimeError(f"source manifest has {len(sources)} sessions; expected {TARGET_SOURCE_SESSIONS}")
rows = rows_from_sources(sources)
validate_row_counts(rows)
write_jsonl(args.output, rows, append=args.append)
write_readme(args.readme, sources, rows)
print(f"wrote {len(rows)} rows -> {args.output}")
print(f"wrote source manifest -> {args.sources_output}")
print(f"wrote readme -> {args.readme}")
if __name__ == "__main__":
main()