Compare commits

..

1 Commits

Author SHA1 Message Date
kimi
75a6a498b4 fix: use word-boundary regex for sensitive pattern matching to avoid false positives on max_tokens
The _SENSITIVE_PATTERNS list used simple substring matching, so "token"
matched "max_tokens", causing the distillation pipeline to block facts
about max_tokens parameters. Replaced with compiled regexes using
lookaround assertions so compound terms like max_tokens and num_tokens
are no longer falsely flagged.

Fixes #625

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-20 16:35:37 -04:00
11 changed files with 232 additions and 258 deletions

View File

@@ -1,4 +1,4 @@
from datetime import UTC, date, datetime
from datetime import date, datetime
from enum import StrEnum
from sqlalchemy import JSON, Boolean, Column, Date, DateTime, Index, Integer, String
@@ -40,13 +40,8 @@ class Task(Base):
deferred_at = Column(DateTime, nullable=True)
# Timestamps
created_at = Column(DateTime, default=lambda: datetime.now(UTC), nullable=False)
updated_at = Column(
DateTime,
default=lambda: datetime.now(UTC),
onupdate=lambda: datetime.now(UTC),
nullable=False,
)
created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False)
__table_args__ = (Index("ix_task_state_order", "state", "sort_order"),)
@@ -64,4 +59,4 @@ class JournalEntry(Base):
gratitude = Column(String(500), nullable=True)
energy_level = Column(Integer, nullable=True) # User-reported, 1-10
created_at = Column(DateTime, default=lambda: datetime.now(UTC), nullable=False)
created_at = Column(DateTime, default=datetime.utcnow, nullable=False)

View File

@@ -38,56 +38,6 @@ def get_later_tasks(db: Session) -> list[Task]:
)
def _create_mit_tasks(db: Session, titles: list[str | None]) -> list[int]:
"""Create MIT tasks from a list of titles, return their IDs."""
task_ids: list[int] = []
for title in titles:
if title:
task = Task(
title=title,
is_mit=True,
state=TaskState.LATER,
certainty=TaskCertainty.SOFT,
)
db.add(task)
db.commit()
db.refresh(task)
task_ids.append(task.id)
return task_ids
def _create_other_tasks(db: Session, other_tasks: str):
"""Create non-MIT tasks from newline-separated text."""
for line in other_tasks.split("\n"):
line = line.strip()
if line:
task = Task(
title=line,
state=TaskState.LATER,
certainty=TaskCertainty.FUZZY,
)
db.add(task)
def _seed_now_next(db: Session):
"""Set initial NOW/NEXT states when both slots are empty."""
if get_now_task(db) or get_next_task(db):
return
later_tasks = (
db.query(Task)
.filter(Task.state == TaskState.LATER)
.order_by(Task.is_mit.desc(), Task.sort_order)
.all()
)
if later_tasks:
later_tasks[0].state = TaskState.NOW
db.add(later_tasks[0])
db.flush()
if len(later_tasks) > 1:
later_tasks[1].state = TaskState.NEXT
db.add(later_tasks[1])
def promote_tasks(db: Session):
"""Enforce the NOW/NEXT/LATER state machine invariants.
@@ -164,19 +114,63 @@ async def post_morning_ritual(
other_tasks: str = Form(""),
):
"""Process morning ritual: create MITs, other tasks, and set initial states."""
# Create Journal Entry
mit_task_ids = []
journal_entry = JournalEntry(entry_date=date.today())
db.add(journal_entry)
db.commit()
db.refresh(journal_entry)
journal_entry.mit_task_ids = _create_mit_tasks(db, [mit1_title, mit2_title, mit3_title])
# Create MIT tasks
for mit_title in [mit1_title, mit2_title, mit3_title]:
if mit_title:
task = Task(
title=mit_title,
is_mit=True,
state=TaskState.LATER, # Initially LATER, will be promoted
certainty=TaskCertainty.SOFT,
)
db.add(task)
db.commit()
db.refresh(task)
mit_task_ids.append(task.id)
journal_entry.mit_task_ids = mit_task_ids
db.add(journal_entry)
_create_other_tasks(db, other_tasks)
# Create other tasks
for task_title in other_tasks.split("\n"):
task_title = task_title.strip()
if task_title:
task = Task(
title=task_title,
state=TaskState.LATER,
certainty=TaskCertainty.FUZZY,
)
db.add(task)
db.commit()
_seed_now_next(db)
db.commit()
# Set initial NOW/NEXT states
# Set initial NOW/NEXT states after all tasks are created
if not get_now_task(db) and not get_next_task(db):
later_tasks = (
db.query(Task)
.filter(Task.state == TaskState.LATER)
.order_by(Task.is_mit.desc(), Task.sort_order)
.all()
)
if later_tasks:
# Set the highest priority LATER task to NOW
later_tasks[0].state = TaskState.NOW
db.add(later_tasks[0])
db.flush() # Flush to make the change visible for the next query
# Set the next highest priority LATER task to NEXT
if len(later_tasks) > 1:
later_tasks[1].state = TaskState.NEXT
db.add(later_tasks[1])
db.commit() # Commit changes after initial NOW/NEXT setup
return templates.TemplateResponse(
request,

View File

@@ -5,7 +5,7 @@ import sqlite3
import uuid
from collections.abc import Generator
from contextlib import closing, contextmanager
from datetime import UTC, datetime
from datetime import datetime
from pathlib import Path
from fastapi import APIRouter, Form, HTTPException, Request
@@ -219,7 +219,7 @@ async def create_task_form(
raise HTTPException(status_code=400, detail="Task title cannot be empty")
task_id = str(uuid.uuid4())
now = datetime.now(UTC).isoformat()
now = datetime.utcnow().isoformat()
priority = priority if priority in VALID_PRIORITIES else "normal"
with _get_db() as db:
@@ -287,7 +287,7 @@ async def modify_task(
async def _set_status(request: Request, task_id: str, new_status: str):
"""Helper to update status and return refreshed task card."""
completed_at = (
datetime.now(UTC).isoformat() if new_status in ("completed", "vetoed", "failed") else None
datetime.utcnow().isoformat() if new_status in ("completed", "vetoed", "failed") else None
)
with _get_db() as db:
db.execute(
@@ -316,7 +316,7 @@ async def api_create_task(request: Request):
raise HTTPException(422, "title is required")
task_id = str(uuid.uuid4())
now = datetime.now(UTC).isoformat()
now = datetime.utcnow().isoformat()
priority = body.get("priority", "normal")
if priority not in VALID_PRIORITIES:
priority = "normal"
@@ -358,7 +358,7 @@ async def api_update_status(task_id: str, request: Request):
raise HTTPException(422, f"Invalid status. Must be one of: {VALID_STATUSES}")
completed_at = (
datetime.now(UTC).isoformat() if new_status in ("completed", "vetoed", "failed") else None
datetime.utcnow().isoformat() if new_status in ("completed", "vetoed", "failed") else None
)
with _get_db() as db:
db.execute(

View File

@@ -5,7 +5,7 @@ import sqlite3
import uuid
from collections.abc import Generator
from contextlib import closing, contextmanager
from datetime import UTC, datetime
from datetime import datetime
from pathlib import Path
from fastapi import APIRouter, Form, HTTPException, Request
@@ -144,7 +144,7 @@ async def submit_work_order(
related_files: str = Form(""),
):
wo_id = str(uuid.uuid4())
now = datetime.now(UTC).isoformat()
now = datetime.utcnow().isoformat()
priority = priority if priority in PRIORITIES else "medium"
category = category if category in CATEGORIES else "suggestion"
@@ -211,7 +211,7 @@ async def active_partial(request: Request):
async def _update_status(request: Request, wo_id: str, new_status: str, **extra):
completed_at = (
datetime.now(UTC).isoformat() if new_status in ("completed", "rejected") else None
datetime.utcnow().isoformat() if new_status in ("completed", "rejected") else None
)
with _get_db() as db:
sets = ["status=?", "completed_at=COALESCE(?, completed_at)"]

View File

@@ -174,8 +174,15 @@ class ConversationManager:
return None
_TOOL_KEYWORDS = frozenset(
{
def should_use_tools(self, message: str, context: ConversationContext) -> bool:
"""Determine if this message likely requires tools.
Returns True if tools are likely needed, False for simple chat.
"""
message_lower = message.lower().strip()
# Tool keywords that suggest tool usage is needed
tool_keywords = [
"search",
"look up",
"find",
@@ -196,11 +203,10 @@ class ConversationManager:
"shell",
"command",
"install",
}
)
]
_CHAT_ONLY_KEYWORDS = frozenset(
{
# Chat-only keywords that definitely don't need tools
chat_only = [
"hello",
"hi ",
"hey",
@@ -215,47 +221,30 @@ class ConversationManager:
"goodbye",
"tell me about yourself",
"what can you do",
}
)
]
_SIMPLE_QUESTION_PREFIXES = ("what is", "who is", "how does", "why is", "when did", "where is")
_TIME_WORDS = ("today", "now", "current", "latest", "this week", "this month")
# Check for chat-only patterns first
for pattern in chat_only:
if pattern in message_lower:
return False
def _is_chat_only(self, message_lower: str) -> bool:
"""Return True if the message matches a chat-only pattern."""
return any(kw in message_lower for kw in self._CHAT_ONLY_KEYWORDS)
# Check for tool keywords
for keyword in tool_keywords:
if keyword in message_lower:
return True
def _has_tool_keyword(self, message_lower: str) -> bool:
"""Return True if the message contains a tool-related keyword."""
return any(kw in message_lower for kw in self._TOOL_KEYWORDS)
def _is_simple_question(self, message_lower: str) -> bool | None:
"""Check if message is a simple question.
Returns True if it needs tools (real-time info), False if it
doesn't, or None if the message isn't a simple question.
"""
for prefix in self._SIMPLE_QUESTION_PREFIXES:
if message_lower.startswith(prefix):
return any(t in message_lower for t in self._TIME_WORDS)
return None
def should_use_tools(self, message: str, context: ConversationContext) -> bool:
"""Determine if this message likely requires tools.
Returns True if tools are likely needed, False for simple chat.
"""
message_lower = message.lower().strip()
if self._is_chat_only(message_lower):
return False
if self._has_tool_keyword(message_lower):
return True
simple = self._is_simple_question(message_lower)
if simple is not None:
return simple
# Simple questions (starting with what, who, how, why, when, where)
# usually don't need tools unless about current/real-time info
simple_question_words = ["what is", "who is", "how does", "why is", "when did", "where is"]
for word in simple_question_words:
if message_lower.startswith(word):
# Check if it's asking about current/real-time info
time_words = ["today", "now", "current", "latest", "this week", "this month"]
if any(t in message_lower for t in time_words):
return True
return False
# Default: don't use tools for unclear cases
return False

View File

@@ -30,7 +30,7 @@ import shutil
import sqlite3
import uuid
from contextlib import closing
from datetime import UTC, datetime
from datetime import datetime
from pathlib import Path
import httpx
@@ -196,7 +196,7 @@ def _bridge_to_work_order(title: str, body: str, category: str) -> None:
body,
category,
"timmy-thinking",
datetime.now(UTC).isoformat(),
datetime.utcnow().isoformat(),
),
)
conn.commit()

View File

@@ -392,26 +392,31 @@ def _build_insights(
return insights or ["Conversations look healthy. Keep up the good work."]
def _format_recurring_topics(repeated: list[tuple[str, int]]) -> list[str]:
"""Format the recurring-topics section of the reflection report."""
if repeated:
lines = ["### Recurring Topics"]
for word, count in repeated:
lines.append(f'- "{word}" ({count} mentions)')
lines.append("")
return lines
return ["### Recurring Topics\nNo strong patterns detected.\n"]
def self_reflect(limit: int = 30) -> str:
"""Review recent conversations and reflect on Timmy's own behavior.
Scans past session entries for patterns: low-confidence responses,
errors, repeated topics, and conversation quality signals. Returns
a structured reflection that Timmy can use to improve.
def _build_reflection_report(
entries: list[dict],
errors: list[dict],
timmy_msgs: list[dict],
user_msgs: list[dict],
low_conf: list[dict],
repeated: list[tuple[str, int]],
) -> str:
"""Assemble the full self-reflection report from analysed data."""
Args:
limit: How many recent entries to review (default 30).
Returns:
A formatted self-reflection report.
"""
sl = get_session_logger()
sl.flush()
entries = sl.get_recent_entries(limit=limit)
if not entries:
return "No conversation history to reflect on yet."
_messages, errors, timmy_msgs, user_msgs = _categorize_entries(entries)
low_conf = _find_low_confidence(timmy_msgs)
repeated = _find_repeated_topics(user_msgs)
# Build reflection report
sections: list[str] = ["## Self-Reflection Report\n"]
sections.append(
f"Reviewed {len(entries)} recent entries: "
@@ -441,37 +446,16 @@ def _build_reflection_report(
)
)
sections.extend(_format_recurring_topics(repeated))
if repeated:
sections.append("### Recurring Topics")
for word, count in repeated:
sections.append(f'- "{word}" ({count} mentions)')
sections.append("")
else:
sections.append("### Recurring Topics\nNo strong patterns detected.\n")
sections.append("### Insights")
for insight in _build_insights(low_conf, errors, repeated):
sections.append(f"- {insight}")
return "\n".join(sections)
def self_reflect(limit: int = 30) -> str:
"""Review recent conversations and reflect on Timmy's own behavior.
Scans past session entries for patterns: low-confidence responses,
errors, repeated topics, and conversation quality signals. Returns
a structured reflection that Timmy can use to improve.
Args:
limit: How many recent entries to review (default 30).
Returns:
A formatted self-reflection report.
"""
sl = get_session_logger()
sl.flush()
entries = sl.get_recent_entries(limit=limit)
if not entries:
return "No conversation history to reflect on yet."
_messages, errors, timmy_msgs, user_msgs = _categorize_entries(entries)
low_conf = _find_low_confidence(timmy_msgs)
repeated = _find_repeated_topics(user_msgs)
return _build_reflection_report(entries, errors, timmy_msgs, user_msgs, low_conf, repeated)

View File

@@ -39,19 +39,21 @@ _DEFAULT_DB = Path("data/thoughts.db")
# qwen3 and other reasoning models wrap chain-of-thought in <think> tags
_THINK_TAG_RE = re.compile(r"<think>.*?</think>\s*", re.DOTALL)
# Sensitive patterns that must never be stored as facts
_SENSITIVE_PATTERNS = [
"token",
"password",
"secret",
"api_key",
"apikey",
"credential",
".config/",
"/token",
"access_token",
"private_key",
"ssh_key",
# Sensitive patterns that must never be stored as facts.
# Uses compiled regexes with word boundaries so that compound technical
# terms like "max_tokens" or "num_tokens" are NOT falsely flagged.
_SENSITIVE_RE = [
re.compile(r"(?<![a-z_])token(?![a-z_])", re.IGNORECASE), # "token" but not "max_tokens"
re.compile(r"password", re.IGNORECASE),
re.compile(r"secret", re.IGNORECASE),
re.compile(r"api_key", re.IGNORECASE),
re.compile(r"apikey", re.IGNORECASE),
re.compile(r"credential", re.IGNORECASE),
re.compile(r"\.config/"),
re.compile(r"/token\b"),
re.compile(r"access_token", re.IGNORECASE),
re.compile(r"private_key", re.IGNORECASE),
re.compile(r"ssh_key", re.IGNORECASE),
]
# Meta-observation phrases to filter out from distilled facts
@@ -548,7 +550,7 @@ class ThinkingEngine:
fact_lower = fact.lower()
# Block sensitive information
if any(pat in fact_lower for pat in _SENSITIVE_PATTERNS):
if any(pat.search(fact) for pat in _SENSITIVE_RE):
logger.warning("Distill: blocked sensitive fact: %s", fact[:60])
continue

View File

@@ -89,31 +89,45 @@ def list_swarm_agents() -> dict[str, Any]:
}
def _find_kimi_cli() -> str | None:
"""Return the path to the kimi CLI binary, or None if not installed."""
def delegate_to_kimi(task: str, working_directory: str = "") -> dict[str, Any]:
"""Delegate a coding task to Kimi, the external coding agent.
Kimi has 262K context and is optimized for code tasks: writing,
debugging, refactoring, test writing. Timmy thinks and plans,
Kimi executes bulk code changes.
Args:
task: Clear, specific coding task description. Include file paths
and expected behavior. Good: "Fix the bug in src/timmy/session.py
where sessions don't persist." Bad: "Fix all bugs."
working_directory: Directory for Kimi to work in. Defaults to repo root.
Returns:
Dict with success status and Kimi's output or error.
"""
import shutil
return shutil.which("kimi")
def _resolve_workdir(working_directory: str) -> str | dict[str, Any]:
"""Return a validated working directory path, or an error dict."""
import subprocess
from pathlib import Path
from config import settings
kimi_path = shutil.which("kimi")
if not kimi_path:
return {
"success": False,
"error": "kimi CLI not found on PATH. Install with: pip install kimi-cli",
}
workdir = working_directory or settings.repo_root
if not Path(workdir).is_dir():
return {
"success": False,
"error": f"Working directory does not exist: {workdir}",
}
return workdir
cmd = [kimi_path, "--print", "-p", task]
def _run_kimi(cmd: list[str], workdir: str) -> dict[str, Any]:
"""Execute the kimi subprocess and return a result dict."""
import subprocess
logger.info("Delegating to Kimi: %s (cwd=%s)", task[:80], workdir)
try:
result = subprocess.run(
@@ -143,34 +157,3 @@ def _run_kimi(cmd: list[str], workdir: str) -> dict[str, Any]:
"success": False,
"error": f"Failed to run Kimi: {exc}",
}
def delegate_to_kimi(task: str, working_directory: str = "") -> dict[str, Any]:
"""Delegate a coding task to Kimi, the external coding agent.
Kimi has 262K context and is optimized for code tasks: writing,
debugging, refactoring, test writing. Timmy thinks and plans,
Kimi executes bulk code changes.
Args:
task: Clear, specific coding task description. Include file paths
and expected behavior. Good: "Fix the bug in src/timmy/session.py
where sessions don't persist." Bad: "Fix all bugs."
working_directory: Directory for Kimi to work in. Defaults to repo root.
Returns:
Dict with success status and Kimi's output or error.
"""
kimi_path = _find_kimi_cli()
if not kimi_path:
return {
"success": False,
"error": "kimi CLI not found on PATH. Install with: pip install kimi-cli",
}
workdir = _resolve_workdir(working_directory)
if isinstance(workdir, dict):
return workdir
logger.info("Delegating to Kimi: %s (cwd=%s)", task[:80], workdir)
return _run_kimi([kimi_path, "--print", "-p", task], workdir)

View File

@@ -86,40 +86,6 @@ def _pip_snapshot(mood: str, confidence: float) -> dict:
return pip_familiar.snapshot().to_dict()
def _resolve_mood(state) -> str:
"""Map cognitive mood/engagement to a presence mood string."""
if state.engagement == "idle" and state.mood == "settled":
return "calm"
return _MOOD_MAP.get(state.mood, "calm")
def _resolve_confidence(state) -> float:
"""Compute normalised confidence from cognitive tracker state."""
if state._confidence_count > 0:
raw = state._confidence_sum / state._confidence_count
else:
raw = 0.7
return round(max(0.0, min(1.0, raw)), 2)
def _build_active_threads(state) -> list[dict]:
"""Convert active commitments into presence thread dicts."""
return [
{"type": "thinking", "ref": c[:80], "status": "active"}
for c in state.active_commitments[:10]
]
def _build_environment() -> dict:
"""Return the environment section using local wall-clock time."""
local_now = datetime.now()
return {
"time_of_day": _time_of_day(local_now.hour),
"local_time": local_now.strftime("%-I:%M %p"),
"day_of_week": local_now.strftime("%A"),
}
def get_state_dict() -> dict:
"""Build presence state dict from current cognitive state.
@@ -132,19 +98,37 @@ def get_state_dict() -> dict:
state = cognitive_tracker.get_state()
now = datetime.now(UTC)
mood = _resolve_mood(state)
confidence = _resolve_confidence(state)
# Map cognitive mood to presence mood
mood = _MOOD_MAP.get(state.mood, "calm")
if state.engagement == "idle" and state.mood == "settled":
mood = "calm"
# Confidence from cognitive tracker
if state._confidence_count > 0:
confidence = state._confidence_sum / state._confidence_count
else:
confidence = 0.7
# Build active threads from commitments
threads = []
for commitment in state.active_commitments[:10]:
threads.append({"type": "thinking", "ref": commitment[:80], "status": "active"})
# Activity
activity = _ACTIVITY_MAP.get(state.engagement, "idle")
# Environment
local_now = datetime.now()
return {
"version": 1,
"liveness": now.strftime("%Y-%m-%dT%H:%M:%SZ"),
"current_focus": state.focus_topic or "",
"active_threads": _build_active_threads(state),
"active_threads": threads,
"recent_events": [],
"concerns": [],
"mood": mood,
"confidence": confidence,
"confidence": round(max(0.0, min(1.0, confidence)), 2),
"energy": round(_current_energy(), 2),
"identity": {
"name": "Timmy",
@@ -159,7 +143,11 @@ def get_state_dict() -> dict:
"visitor_present": False,
"conversation_turns": state.conversation_depth,
},
"environment": _build_environment(),
"environment": {
"time_of_day": _time_of_day(local_now.hour),
"local_time": local_now.strftime("%-I:%M %p"),
"day_of_week": local_now.strftime("%A"),
},
"familiar": _pip_snapshot(mood, confidence),
"meta": {
"schema_version": 1,

View File

@@ -1188,3 +1188,42 @@ def test_references_real_files_blocks_mixed(tmp_path):
# Mix of real and fake files — should fail because of the fake one
text = "Fix src/timmy/thinking.py and also src/timmy/nonexistent_module.py for the memory leak."
assert ThinkingEngine._references_real_files(text) is False
# ---------------------------------------------------------------------------
# Sensitive-pattern regression: max_tokens must NOT be flagged (#625)
# ---------------------------------------------------------------------------
def test_sensitive_patterns_allow_max_tokens():
"""_SENSITIVE_RE should not flag 'max_tokens' as sensitive (#625)."""
from timmy.thinking import _SENSITIVE_RE
safe_facts = [
"The cascade router passes max_tokens to Ollama provider.",
"max_tokens=request.max_tokens in the completion call.",
"num_tokens defaults to 2048.",
"total_prompt_tokens is tracked in stats.",
]
for fact in safe_facts:
assert not any(pat.search(fact) for pat in _SENSITIVE_RE), (
f"False positive: {fact!r} was flagged as sensitive"
)
def test_sensitive_patterns_still_block_real_secrets():
"""_SENSITIVE_RE should still block actual secrets."""
from timmy.thinking import _SENSITIVE_RE
dangerous_facts = [
"The token is abc123def456.",
"Set password to hunter2.",
"api_key = sk-live-xyz",
"Found credential in .env file.",
"access_token expired yesterday.",
"private_key stored in vault.",
]
for fact in dangerous_facts:
assert any(pat.search(fact) for pat in _SENSITIVE_RE), (
f"Missed secret: {fact!r} was NOT flagged as sensitive"
)