[claude] Fix pre-existing ruff lint errors blocking git hooks (#1247) #1248

Merged
claude merged 1 commits from claude/issue-1247 into main 2026-03-23 23:33:37 +00:00
23 changed files with 116 additions and 130 deletions

View File

@@ -42,19 +42,19 @@ from dashboard.routes.hermes import router as hermes_router
from dashboard.routes.loop_qa import router as loop_qa_router
from dashboard.routes.memory import router as memory_router
from dashboard.routes.mobile import router as mobile_router
from dashboard.routes.nexus import router as nexus_router
from dashboard.routes.models import api_router as models_api_router
from dashboard.routes.models import router as models_router
from dashboard.routes.nexus import router as nexus_router
from dashboard.routes.quests import router as quests_router
from dashboard.routes.scorecards import router as scorecards_router
from dashboard.routes.sovereignty_metrics import router as sovereignty_metrics_router
from dashboard.routes.sovereignty_ws import router as sovereignty_ws_router
from dashboard.routes.three_strike import router as three_strike_router
from dashboard.routes.spark import router as spark_router
from dashboard.routes.system import router as system_router
from dashboard.routes.tasks import router as tasks_router
from dashboard.routes.telegram import router as telegram_router
from dashboard.routes.thinking import router as thinking_router
from dashboard.routes.three_strike import router as three_strike_router
from dashboard.routes.tools import router as tools_router
from dashboard.routes.tower import router as tower_router
from dashboard.routes.voice import router as voice_router

View File

@@ -12,7 +12,7 @@ Routes:
import asyncio
import logging
from datetime import datetime, timezone
from datetime import UTC, datetime
from fastapi import APIRouter, Form, Request
from fastapi.responses import HTMLResponse
@@ -39,7 +39,7 @@ _nexus_log: list[dict] = []
def _ts() -> str:
return datetime.now(timezone.utc).strftime("%H:%M:%S")
return datetime.now(UTC).strftime("%H:%M:%S")
def _append_log(role: str, content: str) -> None:
@@ -94,9 +94,7 @@ async def nexus_chat(request: Request, message: str = Form(...)):
# Fetch semantically relevant memories to surface in the sidebar
try:
memory_hits = await asyncio.to_thread(
search_memories, query=message, limit=4
)
memory_hits = await asyncio.to_thread(search_memories, query=message, limit=4)
except Exception as exc:
logger.warning("Nexus memory search failed: %s", exc)
memory_hits = []

View File

@@ -101,9 +101,7 @@ async def record_strike(body: RecordRequest) -> dict[str, Any]:
@router.post("/{category}/{key}/automation")
async def register_automation(
category: str, key: str, body: AutomationRequest
) -> dict[str, bool]:
async def register_automation(category: str, key: str, body: AutomationRequest) -> dict[str, bool]:
"""Register an automation artifact to unblock a (category, key) pair."""
detector = get_detector()
detector.register_automation(category, key, body.artifact_path)

View File

@@ -16,7 +16,10 @@ from dataclasses import dataclass, field
from datetime import UTC, datetime
from enum import Enum
from pathlib import Path
from typing import Any
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from infrastructure.router.classifier import TaskComplexity
from config import settings

View File

@@ -13,7 +13,7 @@ from enum import Enum
class TaskComplexity(Enum):
"""Task complexity tier for model routing."""
SIMPLE = "simple" # Qwen3-8B Q6_K: routine, latency-sensitive
SIMPLE = "simple" # Qwen3-8B Q6_K: routine, latency-sensitive
COMPLEX = "complex" # Qwen3-14B Q5_K_M: quality-sensitive, multi-step
@@ -118,12 +118,15 @@ def classify_task(messages: list[dict]) -> TaskComplexity:
return TaskComplexity.SIMPLE
# Concatenate all user-turn content for analysis
user_content = " ".join(
msg.get("content", "")
for msg in messages
if msg.get("role") in ("user", "human")
and isinstance(msg.get("content"), str)
).lower().strip()
user_content = (
" ".join(
msg.get("content", "")
for msg in messages
if msg.get("role") in ("user", "human") and isinstance(msg.get("content"), str)
)
.lower()
.strip()
)
if not user_content:
return TaskComplexity.SIMPLE

View File

@@ -130,9 +130,7 @@ def run_experiment(
"log": output[-2000:], # Keep last 2k chars
"duration_s": duration,
"success": result.returncode == 0,
"error": (
None if result.returncode == 0 else f"Exit code {result.returncode}"
),
"error": (None if result.returncode == 0 else f"Exit code {result.returncode}"),
}
except subprocess.TimeoutExpired:
duration = int(time.monotonic() - start)
@@ -186,13 +184,9 @@ def evaluate_result(
pct = (delta / baseline) * 100 if baseline != 0 else 0.0
if delta < 0:
return (
f"Improvement: {metric_name} {baseline:.4f} -> {current:.4f} ({pct:+.2f}%)"
)
return f"Improvement: {metric_name} {baseline:.4f} -> {current:.4f} ({pct:+.2f}%)"
elif delta > 0:
return (
f"Regression: {metric_name} {baseline:.4f} -> {current:.4f} ({pct:+.2f}%)"
)
return f"Regression: {metric_name} {baseline:.4f} -> {current:.4f} ({pct:+.2f}%)"
else:
return f"No change: {metric_name} = {current:.4f}"
@@ -360,9 +354,7 @@ class SystemExperiment:
"log": output[-3000:],
"duration_s": duration,
"success": result.returncode == 0,
"error": (
None if result.returncode == 0 else f"Exit code {result.returncode}"
),
"error": (None if result.returncode == 0 else f"Exit code {result.returncode}"),
}
except subprocess.TimeoutExpired:
duration = int(time.monotonic() - start)
@@ -443,9 +435,7 @@ class SystemExperiment:
def commit_changes(self, message: str) -> bool:
"""Stage and commit all changes. Returns True on success."""
try:
subprocess.run(
["git", "add", "-A"], cwd=str(self.workspace), check=True, timeout=30
)
subprocess.run(["git", "add", "-A"], cwd=str(self.workspace), check=True, timeout=30)
subprocess.run(
["git", "commit", "-m", message],
cwd=str(self.workspace),
@@ -500,9 +490,7 @@ class SystemExperiment:
``baseline`` (float|None), ``iterations`` (int), ``results`` (list).
"""
if create_branch:
branch_name = (
f"autoresearch/{self.target.replace('/', '-')}-{int(time.time())}"
)
branch_name = f"autoresearch/{self.target.replace('/', '-')}-{int(time.time())}"
self.create_branch(branch_name)
baseline: float | None = self.baseline
@@ -532,9 +520,7 @@ class SystemExperiment:
# Apply edit
edit_result = self.apply_edit(hypothesis, model=model)
edit_failed = "not available" in edit_result or edit_result.startswith(
"Aider error"
)
edit_failed = "not available" in edit_result or edit_result.startswith("Aider error")
if edit_failed:
logger.warning("Edit phase failed: %s", edit_result)

View File

@@ -96,14 +96,10 @@ def _decide_autonomous(req, tool_name: str, tool_args: dict) -> None:
logger.info("AUTO-APPROVED (allowlist): %s", tool_name)
else:
req.reject(note="Auto-rejected: not in allowlist")
logger.info(
"AUTO-REJECTED (not allowlisted): %s %s", tool_name, str(tool_args)[:100]
)
logger.info("AUTO-REJECTED (not allowlisted): %s %s", tool_name, str(tool_args)[:100])
def _handle_tool_confirmation(
agent, run_output, session_id: str, *, autonomous: bool = False
):
def _handle_tool_confirmation(agent, run_output, session_id: str, *, autonomous: bool = False):
"""Prompt user to approve/reject dangerous tool calls.
When Agno pauses a run because a tool requires confirmation, this
@@ -177,9 +173,7 @@ def think(
):
"""Ask Timmy to think carefully about a topic."""
timmy = create_timmy(backend=backend, session_id=_CLI_SESSION_ID)
timmy.print_response(
f"Think carefully about: {topic}", stream=True, session_id=_CLI_SESSION_ID
)
timmy.print_response(f"Think carefully about: {topic}", stream=True, session_id=_CLI_SESSION_ID)
def _read_message_input(message: list[str]) -> str:
@@ -252,9 +246,7 @@ def chat(
timmy = create_timmy(backend=backend, session_id=session_id)
run_output = timmy.run(message_str, stream=False, session_id=session_id)
run_output = _handle_tool_confirmation(
timmy, run_output, session_id, autonomous=autonomous
)
run_output = _handle_tool_confirmation(timmy, run_output, session_id, autonomous=autonomous)
content = run_output.content if hasattr(run_output, "content") else str(run_output)
if content:
@@ -308,9 +300,7 @@ def repl(
break
try:
response = loop.run_until_complete(
chat(user_input, session_id=session_id)
)
response = loop.run_until_complete(chat(user_input, session_id=session_id))
if response:
typer.echo(response)
typer.echo()
@@ -373,9 +363,7 @@ def interview(
typer.echo("Starting interview...\n")
transcript = run_interview(
chat_fn=lambda msg: loop.run_until_complete(
chat(msg, session_id="interview")
),
chat_fn=lambda msg: loop.run_until_complete(chat(msg, session_id="interview")),
on_answer=_on_answer,
)
@@ -396,9 +384,7 @@ def interview(
@app.command()
def up(
dev: bool = typer.Option(False, "--dev", help="Enable hot-reload for development"),
build: bool = typer.Option(
True, "--build/--no-build", help="Rebuild images before starting"
),
build: bool = typer.Option(True, "--build/--no-build", help="Rebuild images before starting"),
):
"""Start Timmy Time in Docker (dashboard + agents)."""
cmd = ["docker", "compose"]
@@ -432,18 +418,14 @@ def voice(
"-w",
help="Whisper model: tiny.en, base.en, small.en, medium.en",
),
use_say: bool = typer.Option(
False, "--say", help="Use macOS `say` instead of Piper TTS"
),
use_say: bool = typer.Option(False, "--say", help="Use macOS `say` instead of Piper TTS"),
threshold: float = typer.Option(
0.015,
"--threshold",
"-t",
help="Mic silence threshold (RMS). Lower = more sensitive.",
),
silence: float = typer.Option(
1.5, "--silence", help="Seconds of silence to end recording"
),
silence: float = typer.Option(1.5, "--silence", help="Seconds of silence to end recording"),
backend: str | None = _BACKEND_OPTION,
model_size: str | None = _MODEL_SIZE_OPTION,
):
@@ -487,9 +469,7 @@ def focus(
None,
help='Topic to focus on (e.g. "three-phase loop"). Omit to show current focus.',
),
clear: bool = typer.Option(
False, "--clear", "-c", help="Clear focus and return to broad mode"
),
clear: bool = typer.Option(False, "--clear", "-c", help="Clear focus and return to broad mode"),
):
"""Set deep-focus mode on a single problem.
@@ -525,9 +505,7 @@ def healthcheck(
verbose: bool = typer.Option(
False, "--verbose", "-v", help="Show verbose output including issue details"
),
quiet: bool = typer.Option(
False, "--quiet", "-q", help="Only show status line (no details)"
),
quiet: bool = typer.Option(False, "--quiet", "-q", help="Only show status line (no details)"),
):
"""Quick health snapshot before coding.
@@ -649,9 +627,7 @@ def learn(
typer.echo()
typer.echo(typer.style("Autoresearch", bold=True) + f"{target}")
typer.echo(
f" metric={metric} budget={budget}min max={max_experiments} tox={tox_env}"
)
typer.echo(f" metric={metric} budget={budget}min max={max_experiments} tox={tox_env}")
if dry_run:
typer.echo(" (dry-run — no changes will be made)")
typer.echo()

View File

@@ -7,10 +7,11 @@ Also includes vector similarity utilities (cosine similarity, keyword overlap).
"""
import hashlib
import json
import logging
import math
import json
import httpx # Import httpx for Ollama API calls
import httpx # Import httpx for Ollama API calls
from config import settings
@@ -20,14 +21,21 @@ logger = logging.getLogger(__name__)
EMBEDDING_MODEL = None
EMBEDDING_DIM = 384 # MiniLM dimension, will be overridden if Ollama model has different dim
class OllamaEmbedder:
"""Mimics SentenceTransformer interface for Ollama."""
def __init__(self, model_name: str, ollama_url: str):
self.model_name = model_name
self.ollama_url = ollama_url
self.dimension = 0 # Will be updated after first call
self.dimension = 0 # Will be updated after first call
def encode(self, sentences: str | list[str], convert_to_numpy: bool = False, normalize_embeddings: bool = True) -> list[list[float]] | list[float]:
def encode(
self,
sentences: str | list[str],
convert_to_numpy: bool = False,
normalize_embeddings: bool = True,
) -> list[list[float]] | list[float]:
"""Generate embeddings using Ollama."""
if isinstance(sentences, str):
sentences = [sentences]
@@ -43,9 +51,9 @@ class OllamaEmbedder:
response.raise_for_status()
embedding = response.json()["embedding"]
if not self.dimension:
self.dimension = len(embedding) # Set dimension on first successful call
self.dimension = len(embedding) # Set dimension on first successful call
global EMBEDDING_DIM
EMBEDDING_DIM = self.dimension # Update global EMBEDDING_DIM
EMBEDDING_DIM = self.dimension # Update global EMBEDDING_DIM
all_embeddings.append(embedding)
except httpx.RequestError as exc:
logger.error("Ollama embeddings request failed: %s", exc)
@@ -59,6 +67,7 @@ class OllamaEmbedder:
return all_embeddings[0]
return all_embeddings
def _get_embedding_model():
"""Lazy-load embedding model, preferring Ollama if configured."""
global EMBEDDING_MODEL
@@ -69,8 +78,13 @@ def _get_embedding_model():
return EMBEDDING_MODEL
if settings.timmy_embedding_backend == "ollama":
logger.info("MemorySystem: Using Ollama for embeddings with model %s", settings.ollama_embedding_model)
EMBEDDING_MODEL = OllamaEmbedder(settings.ollama_embedding_model, settings.normalized_ollama_url)
logger.info(
"MemorySystem: Using Ollama for embeddings with model %s",
settings.ollama_embedding_model,
)
EMBEDDING_MODEL = OllamaEmbedder(
settings.ollama_embedding_model, settings.normalized_ollama_url
)
# We don't know the dimension until after the first call, so keep it default for now.
# It will be updated dynamically in OllamaEmbedder.encode
return EMBEDDING_MODEL
@@ -79,7 +93,7 @@ def _get_embedding_model():
from sentence_transformers import SentenceTransformer
EMBEDDING_MODEL = SentenceTransformer("all-MiniLM-L6-v2")
EMBEDDING_DIM = 384 # Reset to MiniLM dimension
EMBEDDING_DIM = 384 # Reset to MiniLM dimension
logger.info("MemorySystem: Loaded local embedding model (all-MiniLM-L6-v2)")
except ImportError:
logger.warning("MemorySystem: sentence-transformers not installed, using fallback")
@@ -107,13 +121,12 @@ def embed_text(text: str) -> list[float]:
if model and model is not False:
embedding = model.encode(text)
# Ensure it's a list of floats, not numpy array
if hasattr(embedding, 'tolist'):
if hasattr(embedding, "tolist"):
return embedding.tolist()
return embedding
return _simple_hash_embedding(text)
def cosine_similarity(a: list[float], b: list[float]) -> float:
"""Calculate cosine similarity between two vectors."""
dot = sum(x * y for x, y in zip(a, b, strict=False))

View File

@@ -1318,11 +1318,11 @@ def memory_store(topic: str, report: str, type: str = "research") -> str:
try:
# Dedup check for facts and research — skip if similar exists
if type in ("fact", "research"):
existing = search_memories(
full_content, limit=3, context_type=type, min_relevance=0.75
)
existing = search_memories(full_content, limit=3, context_type=type, min_relevance=0.75)
if existing:
return f"Similar {type} already stored (id={existing[0].id[:8]}). Skipping duplicate."
return (
f"Similar {type} already stored (id={existing[0].id[:8]}). Skipping duplicate."
)
entry = store_memory(
content=full_content,

View File

@@ -222,9 +222,7 @@ class ThreeStrikeStore:
ThreeStrikeError: On the third (or later) strike with no automation.
"""
if category not in CATEGORIES:
raise ValueError(
f"Unknown category '{category}'. Valid: {sorted(CATEGORIES)}"
)
raise ValueError(f"Unknown category '{category}'. Valid: {sorted(CATEGORIES)}")
now = datetime.now(UTC).isoformat()
meta_json = json.dumps(metadata or {})
@@ -404,9 +402,7 @@ class ThreeStrikeStore:
"""Return all strike records ordered by last seen (most recent first)."""
try:
with closing(self._connect()) as conn:
rows = conn.execute(
"SELECT * FROM strikes ORDER BY last_seen DESC"
).fetchall()
rows = conn.execute("SELECT * FROM strikes ORDER BY last_seen DESC").fetchall()
return [
StrikeRecord(
category=r["category"],

View File

@@ -20,12 +20,12 @@ Sub-modules:
# ``from timmy.tools import <symbol>`` continue to work unchanged.
from timmy.tools._base import (
_AGNO_TOOLS_AVAILABLE,
_TOOL_USAGE,
AgentTools,
PersonaTools,
ToolStats,
_AGNO_TOOLS_AVAILABLE,
_ImportError,
_TOOL_USAGE,
_track_tool_usage,
get_tool_stats,
)

View File

@@ -11,10 +11,10 @@ logger = logging.getLogger(__name__)
# Lazy imports to handle test mocking
_ImportError = None
try:
from agno.tools import Toolkit
from agno.tools.file import FileTools
from agno.tools.python import PythonTools
from agno.tools.shell import ShellTools
from agno.tools import Toolkit # noqa: F401
from agno.tools.file import FileTools # noqa: F401
from agno.tools.python import PythonTools # noqa: F401
from agno.tools.shell import ShellTools # noqa: F401
_AGNO_TOOLS_AVAILABLE = True
except ImportError as e:
@@ -41,7 +41,7 @@ class AgentTools:
agent_id: str
agent_name: str
toolkit: "Toolkit"
toolkit: Toolkit
available_tools: list[str] = field(default_factory=list)

View File

@@ -16,11 +16,11 @@ from pathlib import Path
from timmy.tools._base import (
_AGNO_TOOLS_AVAILABLE,
_ImportError,
FileTools,
PythonTools,
ShellTools,
Toolkit,
_ImportError,
)
from timmy.tools.file_tools import (
_make_smart_read_file,
@@ -363,7 +363,7 @@ AGENT_TOOLKITS: dict[str, Callable[[], Toolkit]] = {
}
def get_tools_for_agent(agent_id: str, base_dir: str | Path | None = None) -> "Toolkit | None":
def get_tools_for_agent(agent_id: str, base_dir: str | Path | None = None) -> Toolkit | None:
"""Get the appropriate toolkit for an agent.
Args:

View File

@@ -13,16 +13,16 @@ from pathlib import Path
from timmy.tools._base import (
_AGNO_TOOLS_AVAILABLE,
_ImportError,
FileTools,
PythonTools,
Toolkit,
_ImportError,
)
logger = logging.getLogger(__name__)
def _make_smart_read_file(file_tools: "FileTools") -> Callable:
def _make_smart_read_file(file_tools: FileTools) -> Callable:
"""Wrap FileTools.read_file so directories auto-list their contents.
When the user (or the LLM) passes a directory path to read_file,

View File

@@ -17,11 +17,11 @@ from pathlib import Path
from timmy.tools._base import (
_AGNO_TOOLS_AVAILABLE,
_ImportError,
FileTools,
PythonTools,
ShellTools,
Toolkit,
_ImportError,
)
from timmy.tools.file_tools import _make_smart_read_file

View File

@@ -49,8 +49,10 @@ def test_nexus_chat_posts_message(client):
def test_nexus_teach_stores_fact(client):
"""POST /nexus/teach should persist a fact and return confirmation."""
with patch("dashboard.routes.nexus.store_personal_fact") as mock_store, \
patch("dashboard.routes.nexus.recall_personal_facts_with_ids", return_value=[]):
with (
patch("dashboard.routes.nexus.store_personal_fact") as mock_store,
patch("dashboard.routes.nexus.recall_personal_facts_with_ids", return_value=[]),
):
mock_store.return_value = None
response = client.post("/nexus/teach", data={"fact": "Timmy loves Python"})
assert response.status_code == 200

View File

@@ -1,7 +1,5 @@
"""Tests for Qwen3 dual-model task complexity classifier."""
import pytest
from infrastructure.router.classifier import TaskComplexity, classify_task

View File

@@ -39,9 +39,7 @@ class TestPrepareExperiment:
from timmy.autoresearch import prepare_experiment
with patch("timmy.autoresearch.subprocess.run") as mock_run:
mock_run.return_value = MagicMock(
returncode=1, stdout="", stderr="auth failed"
)
mock_run.return_value = MagicMock(returncode=1, stdout="", stderr="auth failed")
result = prepare_experiment(tmp_path)
assert "failed" in result.lower()
@@ -104,9 +102,7 @@ class TestRunExperiment:
(repo_dir / "train.py").write_text("print('done')")
with patch("timmy.autoresearch.subprocess.run") as mock_run:
mock_run.return_value = MagicMock(
returncode=0, stdout="no metrics here", stderr=""
)
mock_run.return_value = MagicMock(returncode=0, stdout="no metrics here", stderr="")
result = run_experiment(tmp_path)
assert result["success"] is True

View File

@@ -572,7 +572,9 @@ class TestMemoryStore:
mock_vector_store["store"].reset_mock()
# Test with 'research'
result = memory_store(topic="Similar research", report="Similar research content", type="research")
result = memory_store(
topic="Similar research", report="Similar research content", type="research"
)
assert "similar" in result.lower() or "duplicate" in result.lower()
mock_vector_store["store"].assert_not_called()
@@ -600,7 +602,9 @@ class TestMemoryStore:
valid_types = ["fact", "conversation", "document", "research"]
for ctx_type in valid_types:
mock_vector_store["store"].reset_mock()
memory_store(topic=f"Topic for {ctx_type}", report=f"Content for {ctx_type}", type=ctx_type)
memory_store(
topic=f"Topic for {ctx_type}", report=f"Content for {ctx_type}", type=ctx_type
)
mock_vector_store["store"].assert_called_once()
def test_memory_store_strips_report_and_adds_topic(self, mock_vector_store):

View File

@@ -190,7 +190,7 @@ class TestThreeStrikeStore:
@pytest.mark.unit
def test_get_events_respects_limit(self, store):
for i in range(5):
for _ in range(5):
try:
store.record("vlm_prompt_edit", "el")
except ThreeStrikeError:

View File

@@ -72,9 +72,7 @@ class TestThreeStrikeRoutes:
"/sovereignty/three-strike/record",
json={"category": "vlm_prompt_edit", "key": "events_test_key"},
)
response = client.get(
"/sovereignty/three-strike/vlm_prompt_edit/events_test_key/events"
)
response = client.get("/sovereignty/three-strike/vlm_prompt_edit/events_test_key/events")
assert response.status_code == 200
data = response.json()
assert data["category"] == "vlm_prompt_edit"

View File

@@ -310,7 +310,9 @@ class TestResearchOrchestrator:
mock_llm_client = MagicMock()
mock_llm_client.completion = AsyncMock(return_value=mock_llm_response)
with patch("timmy.paperclip.google_web_search", new=AsyncMock(return_value=mock_search_results)):
with patch(
"timmy.paperclip.google_web_search", new=AsyncMock(return_value=mock_search_results)
):
with patch("timmy.paperclip.get_llm_client", return_value=mock_llm_client):
report = await orchestrator.run_research_pipeline("test query")
@@ -358,7 +360,10 @@ class TestResearchOrchestrator:
orchestrator.run_research_pipeline = AsyncMock(return_value=mock_report)
orchestrator.post_gitea_comment = AsyncMock()
with patch("timmy.paperclip.triage_research_report", new=AsyncMock(return_value=mock_triage_results)):
with patch(
"timmy.paperclip.triage_research_report",
new=AsyncMock(return_value=mock_triage_results),
):
result = await orchestrator.run({"issue_number": 42})
assert "Research complete for issue #42" in result
@@ -500,7 +505,9 @@ class TestPaperclipPoller:
assert poller.client.update_task_status.call_count == 2
poller.client.update_task_status.assert_any_call("task-1", "running")
poller.client.update_task_status.assert_any_call("task-1", "completed", "Research completed successfully")
poller.client.update_task_status.assert_any_call(
"task-1", "completed", "Research completed successfully"
)
poller.orchestrator.run.assert_called_once_with({"issue_number": 42})
@pytest.mark.asyncio

View File

@@ -321,7 +321,10 @@ async def test_run_cycle_counts_dispatched_issues():
patch(
"timmy.vassal.backlog.fetch_open_issues",
new_callable=AsyncMock,
return_value=[{"number": i, "title": f"Issue {i}", "labels": [], "assignees": []} for i in range(1, 4)],
return_value=[
{"number": i, "title": f"Issue {i}", "labels": [], "assignees": []}
for i in range(1, 4)
],
),
patch(
"timmy.vassal.backlog.triage_issues",
@@ -357,7 +360,10 @@ async def test_run_cycle_respects_max_dispatch_cap():
patch(
"timmy.vassal.backlog.fetch_open_issues",
new_callable=AsyncMock,
return_value=[{"number": i, "title": f"Issue {i}", "labels": [], "assignees": []} for i in range(1, 6)],
return_value=[
{"number": i, "title": f"Issue {i}", "labels": [], "assignees": []}
for i in range(1, 6)
],
),
patch(
"timmy.vassal.backlog.triage_issues",
@@ -392,6 +398,8 @@ def test_resolve_interval_uses_explicit_value():
def test_resolve_interval_falls_back_to_300():
orch = VassalOrchestrator()
with patch("timmy.vassal.orchestration_loop.VassalOrchestrator._resolve_interval") as mock_resolve:
with patch(
"timmy.vassal.orchestration_loop.VassalOrchestrator._resolve_interval"
) as mock_resolve:
mock_resolve.return_value = 300.0
assert orch._resolve_interval() == 300.0