Compare commits

..

1 Commits

Author SHA1 Message Date
kimi
75a6a498b4 fix: use word-boundary regex for sensitive pattern matching to avoid false positives on max_tokens
The _SENSITIVE_PATTERNS list used simple substring matching, so "token"
matched "max_tokens", causing the distillation pipeline to block facts
about max_tokens parameters. Replaced with compiled regexes using
lookaround assertions so compound terms like max_tokens and num_tokens
are no longer falsely flagged.

Fixes #625

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-20 16:35:37 -04:00
5 changed files with 159 additions and 155 deletions

View File

@@ -38,56 +38,6 @@ def get_later_tasks(db: Session) -> list[Task]:
)
def _create_mit_tasks(db: Session, titles: list[str | None]) -> list[int]:
"""Create MIT tasks from a list of titles, return their IDs."""
task_ids: list[int] = []
for title in titles:
if title:
task = Task(
title=title,
is_mit=True,
state=TaskState.LATER,
certainty=TaskCertainty.SOFT,
)
db.add(task)
db.commit()
db.refresh(task)
task_ids.append(task.id)
return task_ids
def _create_other_tasks(db: Session, other_tasks: str):
"""Create non-MIT tasks from newline-separated text."""
for line in other_tasks.split("\n"):
line = line.strip()
if line:
task = Task(
title=line,
state=TaskState.LATER,
certainty=TaskCertainty.FUZZY,
)
db.add(task)
def _seed_now_next(db: Session):
"""Set initial NOW/NEXT states when both slots are empty."""
if get_now_task(db) or get_next_task(db):
return
later_tasks = (
db.query(Task)
.filter(Task.state == TaskState.LATER)
.order_by(Task.is_mit.desc(), Task.sort_order)
.all()
)
if later_tasks:
later_tasks[0].state = TaskState.NOW
db.add(later_tasks[0])
db.flush()
if len(later_tasks) > 1:
later_tasks[1].state = TaskState.NEXT
db.add(later_tasks[1])
def promote_tasks(db: Session):
"""Enforce the NOW/NEXT/LATER state machine invariants.
@@ -164,19 +114,63 @@ async def post_morning_ritual(
other_tasks: str = Form(""),
):
"""Process morning ritual: create MITs, other tasks, and set initial states."""
# Create Journal Entry
mit_task_ids = []
journal_entry = JournalEntry(entry_date=date.today())
db.add(journal_entry)
db.commit()
db.refresh(journal_entry)
journal_entry.mit_task_ids = _create_mit_tasks(db, [mit1_title, mit2_title, mit3_title])
# Create MIT tasks
for mit_title in [mit1_title, mit2_title, mit3_title]:
if mit_title:
task = Task(
title=mit_title,
is_mit=True,
state=TaskState.LATER, # Initially LATER, will be promoted
certainty=TaskCertainty.SOFT,
)
db.add(task)
db.commit()
db.refresh(task)
mit_task_ids.append(task.id)
journal_entry.mit_task_ids = mit_task_ids
db.add(journal_entry)
_create_other_tasks(db, other_tasks)
# Create other tasks
for task_title in other_tasks.split("\n"):
task_title = task_title.strip()
if task_title:
task = Task(
title=task_title,
state=TaskState.LATER,
certainty=TaskCertainty.FUZZY,
)
db.add(task)
db.commit()
_seed_now_next(db)
db.commit()
# Set initial NOW/NEXT states
# Set initial NOW/NEXT states after all tasks are created
if not get_now_task(db) and not get_next_task(db):
later_tasks = (
db.query(Task)
.filter(Task.state == TaskState.LATER)
.order_by(Task.is_mit.desc(), Task.sort_order)
.all()
)
if later_tasks:
# Set the highest priority LATER task to NOW
later_tasks[0].state = TaskState.NOW
db.add(later_tasks[0])
db.flush() # Flush to make the change visible for the next query
# Set the next highest priority LATER task to NEXT
if len(later_tasks) > 1:
later_tasks[1].state = TaskState.NEXT
db.add(later_tasks[1])
db.commit() # Commit changes after initial NOW/NEXT setup
return templates.TemplateResponse(
request,

View File

@@ -174,8 +174,15 @@ class ConversationManager:
return None
_TOOL_KEYWORDS = frozenset(
{
def should_use_tools(self, message: str, context: ConversationContext) -> bool:
"""Determine if this message likely requires tools.
Returns True if tools are likely needed, False for simple chat.
"""
message_lower = message.lower().strip()
# Tool keywords that suggest tool usage is needed
tool_keywords = [
"search",
"look up",
"find",
@@ -196,11 +203,10 @@ class ConversationManager:
"shell",
"command",
"install",
}
)
]
_CHAT_ONLY_KEYWORDS = frozenset(
{
# Chat-only keywords that definitely don't need tools
chat_only = [
"hello",
"hi ",
"hey",
@@ -215,47 +221,30 @@ class ConversationManager:
"goodbye",
"tell me about yourself",
"what can you do",
}
)
]
_SIMPLE_QUESTION_PREFIXES = ("what is", "who is", "how does", "why is", "when did", "where is")
_TIME_WORDS = ("today", "now", "current", "latest", "this week", "this month")
# Check for chat-only patterns first
for pattern in chat_only:
if pattern in message_lower:
return False
def _is_chat_only(self, message_lower: str) -> bool:
"""Return True if the message matches a chat-only pattern."""
return any(kw in message_lower for kw in self._CHAT_ONLY_KEYWORDS)
# Check for tool keywords
for keyword in tool_keywords:
if keyword in message_lower:
return True
def _has_tool_keyword(self, message_lower: str) -> bool:
"""Return True if the message contains a tool-related keyword."""
return any(kw in message_lower for kw in self._TOOL_KEYWORDS)
def _is_simple_question(self, message_lower: str) -> bool | None:
"""Check if message is a simple question.
Returns True if it needs tools (real-time info), False if it
doesn't, or None if the message isn't a simple question.
"""
for prefix in self._SIMPLE_QUESTION_PREFIXES:
if message_lower.startswith(prefix):
return any(t in message_lower for t in self._TIME_WORDS)
return None
def should_use_tools(self, message: str, context: ConversationContext) -> bool:
"""Determine if this message likely requires tools.
Returns True if tools are likely needed, False for simple chat.
"""
message_lower = message.lower().strip()
if self._is_chat_only(message_lower):
return False
if self._has_tool_keyword(message_lower):
return True
simple = self._is_simple_question(message_lower)
if simple is not None:
return simple
# Simple questions (starting with what, who, how, why, when, where)
# usually don't need tools unless about current/real-time info
simple_question_words = ["what is", "who is", "how does", "why is", "when did", "where is"]
for word in simple_question_words:
if message_lower.startswith(word):
# Check if it's asking about current/real-time info
time_words = ["today", "now", "current", "latest", "this week", "this month"]
if any(t in message_lower for t in time_words):
return True
return False
# Default: don't use tools for unclear cases
return False

View File

@@ -39,19 +39,21 @@ _DEFAULT_DB = Path("data/thoughts.db")
# qwen3 and other reasoning models wrap chain-of-thought in <think> tags
_THINK_TAG_RE = re.compile(r"<think>.*?</think>\s*", re.DOTALL)
# Sensitive patterns that must never be stored as facts
_SENSITIVE_PATTERNS = [
"token",
"password",
"secret",
"api_key",
"apikey",
"credential",
".config/",
"/token",
"access_token",
"private_key",
"ssh_key",
# Sensitive patterns that must never be stored as facts.
# Uses compiled regexes with word boundaries so that compound technical
# terms like "max_tokens" or "num_tokens" are NOT falsely flagged.
_SENSITIVE_RE = [
re.compile(r"(?<![a-z_])token(?![a-z_])", re.IGNORECASE), # "token" but not "max_tokens"
re.compile(r"password", re.IGNORECASE),
re.compile(r"secret", re.IGNORECASE),
re.compile(r"api_key", re.IGNORECASE),
re.compile(r"apikey", re.IGNORECASE),
re.compile(r"credential", re.IGNORECASE),
re.compile(r"\.config/"),
re.compile(r"/token\b"),
re.compile(r"access_token", re.IGNORECASE),
re.compile(r"private_key", re.IGNORECASE),
re.compile(r"ssh_key", re.IGNORECASE),
]
# Meta-observation phrases to filter out from distilled facts
@@ -548,7 +550,7 @@ class ThinkingEngine:
fact_lower = fact.lower()
# Block sensitive information
if any(pat in fact_lower for pat in _SENSITIVE_PATTERNS):
if any(pat.search(fact) for pat in _SENSITIVE_RE):
logger.warning("Distill: blocked sensitive fact: %s", fact[:60])
continue

View File

@@ -89,41 +89,52 @@ def list_swarm_agents() -> dict[str, Any]:
}
def _find_kimi_cli() -> str | None:
"""Return the path to the kimi CLI, or None if not installed."""
import shutil
def delegate_to_kimi(task: str, working_directory: str = "") -> dict[str, Any]:
"""Delegate a coding task to Kimi, the external coding agent.
return shutil.which("kimi")
Kimi has 262K context and is optimized for code tasks: writing,
debugging, refactoring, test writing. Timmy thinks and plans,
Kimi executes bulk code changes.
Args:
task: Clear, specific coding task description. Include file paths
and expected behavior. Good: "Fix the bug in src/timmy/session.py
where sessions don't persist." Bad: "Fix all bugs."
working_directory: Directory for Kimi to work in. Defaults to repo root.
def _resolve_workdir(working_directory: str) -> str | dict[str, Any]:
"""Resolve and validate the working directory.
Returns the resolved path string, or an error dict if invalid.
Returns:
Dict with success status and Kimi's output or error.
"""
import shutil
import subprocess
from pathlib import Path
from config import settings
kimi_path = shutil.which("kimi")
if not kimi_path:
return {
"success": False,
"error": "kimi CLI not found on PATH. Install with: pip install kimi-cli",
}
workdir = working_directory or settings.repo_root
if not Path(workdir).is_dir():
return {
"success": False,
"error": f"Working directory does not exist: {workdir}",
}
return workdir
cmd = [kimi_path, "--print", "-p", task]
def _run_kimi(cmd: list[str], workdir: str) -> dict[str, Any]:
"""Execute the kimi subprocess and return a result dict."""
import subprocess
logger.info("Delegating to Kimi: %s (cwd=%s)", task[:80], workdir)
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=300,
timeout=300, # 5 minute timeout for coding tasks
cwd=workdir,
)
@@ -146,34 +157,3 @@ def _run_kimi(cmd: list[str], workdir: str) -> dict[str, Any]:
"success": False,
"error": f"Failed to run Kimi: {exc}",
}
def delegate_to_kimi(task: str, working_directory: str = "") -> dict[str, Any]:
"""Delegate a coding task to Kimi, the external coding agent.
Kimi has 262K context and is optimized for code tasks: writing,
debugging, refactoring, test writing. Timmy thinks and plans,
Kimi executes bulk code changes.
Args:
task: Clear, specific coding task description. Include file paths
and expected behavior. Good: "Fix the bug in src/timmy/session.py
where sessions don't persist." Bad: "Fix all bugs."
working_directory: Directory for Kimi to work in. Defaults to repo root.
Returns:
Dict with success status and Kimi's output or error.
"""
kimi_path = _find_kimi_cli()
if not kimi_path:
return {
"success": False,
"error": "kimi CLI not found on PATH. Install with: pip install kimi-cli",
}
workdir = _resolve_workdir(working_directory)
if isinstance(workdir, dict):
return workdir
logger.info("Delegating to Kimi: %s (cwd=%s)", task[:80], workdir)
return _run_kimi([kimi_path, "--print", "-p", task], workdir)

View File

@@ -1188,3 +1188,42 @@ def test_references_real_files_blocks_mixed(tmp_path):
# Mix of real and fake files — should fail because of the fake one
text = "Fix src/timmy/thinking.py and also src/timmy/nonexistent_module.py for the memory leak."
assert ThinkingEngine._references_real_files(text) is False
# ---------------------------------------------------------------------------
# Sensitive-pattern regression: max_tokens must NOT be flagged (#625)
# ---------------------------------------------------------------------------
def test_sensitive_patterns_allow_max_tokens():
"""_SENSITIVE_RE should not flag 'max_tokens' as sensitive (#625)."""
from timmy.thinking import _SENSITIVE_RE
safe_facts = [
"The cascade router passes max_tokens to Ollama provider.",
"max_tokens=request.max_tokens in the completion call.",
"num_tokens defaults to 2048.",
"total_prompt_tokens is tracked in stats.",
]
for fact in safe_facts:
assert not any(pat.search(fact) for pat in _SENSITIVE_RE), (
f"False positive: {fact!r} was flagged as sensitive"
)
def test_sensitive_patterns_still_block_real_secrets():
"""_SENSITIVE_RE should still block actual secrets."""
from timmy.thinking import _SENSITIVE_RE
dangerous_facts = [
"The token is abc123def456.",
"Set password to hunter2.",
"api_key = sk-live-xyz",
"Found credential in .env file.",
"access_token expired yesterday.",
"private_key stored in vault.",
]
for fact in dangerous_facts:
assert any(pat.search(fact) for pat in _SENSITIVE_RE), (
f"Missed secret: {fact!r} was NOT flagged as sensitive"
)