[loop-cycle-7] refactor: split research.py into research/ subpackage (#1405) (#1458)
Some checks failed
Tests / lint (push) Successful in 11s
Tests / test (push) Has been cancelled

This commit was merged in pull request #1458.
This commit is contained in:
2026-03-24 19:53:04 +00:00
parent 8518db921e
commit 4d2aeb937f
62 changed files with 1049 additions and 885 deletions

View File

@@ -31,7 +31,16 @@ class TestMonitoringStatusEndpoint:
response = client.get("/monitoring/status")
assert response.status_code == 200
data = response.json()
for key in ("timestamp", "uptime_seconds", "agents", "resources", "economy", "stream", "pipeline", "alerts"):
for key in (
"timestamp",
"uptime_seconds",
"agents",
"resources",
"economy",
"stream",
"pipeline",
"alerts",
):
assert key in data, f"Missing key: {key}"
def test_agents_is_list(self, client):
@@ -48,7 +57,13 @@ class TestMonitoringStatusEndpoint:
response = client.get("/monitoring/status")
data = response.json()
resources = data["resources"]
for field in ("disk_percent", "disk_free_gb", "ollama_reachable", "loaded_models", "warnings"):
for field in (
"disk_percent",
"disk_free_gb",
"ollama_reachable",
"loaded_models",
"warnings",
):
assert field in resources, f"Missing resource field: {field}"
def test_economy_has_expected_fields(self, client):

View File

@@ -71,7 +71,10 @@ class TestAggregateMetricsEdgeCases:
Event(
type="test.execution",
source="ci",
data={"actor": "gemini", "test_files": ["tests/test_alpha.py", "tests/test_beta.py"]},
data={
"actor": "gemini",
"test_files": ["tests/test_alpha.py", "tests/test_beta.py"],
},
),
]
result = _aggregate_metrics(events)

View File

@@ -106,7 +106,12 @@ class TestBudgetTrackerCloudAllowed:
def test_allowed_when_no_spend(self):
tracker = BudgetTracker(db_path=":memory:")
with (
patch.object(type(tracker._get_budget() if hasattr(tracker, "_get_budget") else tracker), "tier_cloud_daily_budget_usd", 5.0, create=True),
patch.object(
type(tracker._get_budget() if hasattr(tracker, "_get_budget") else tracker),
"tier_cloud_daily_budget_usd",
5.0,
create=True,
),
):
# Settings-based check — use real settings (5.0 default, 0 spent)
assert tracker.cloud_allowed() is True
@@ -166,12 +171,14 @@ class TestBudgetTrackerSummary:
class TestGetBudgetTrackerSingleton:
def test_returns_budget_tracker(self):
import infrastructure.models.budget as bmod
bmod._budget_tracker = None
tracker = get_budget_tracker()
assert isinstance(tracker, BudgetTracker)
def test_returns_same_instance(self):
import infrastructure.models.budget as bmod
bmod._budget_tracker = None
t1 = get_budget_tracker()
t2 = get_budget_tracker()

View File

@@ -53,7 +53,15 @@ class TestSpendRecord:
def test_spend_record_with_zero_tokens(self):
"""Test SpendRecord with zero tokens."""
ts = time.time()
record = SpendRecord(ts=ts, provider="openai", model="gpt-4o", tokens_in=0, tokens_out=0, cost_usd=0.0, tier="cloud")
record = SpendRecord(
ts=ts,
provider="openai",
model="gpt-4o",
tokens_in=0,
tokens_out=0,
cost_usd=0.0,
tier="cloud",
)
assert record.tokens_in == 0
assert record.tokens_out == 0
@@ -261,15 +269,11 @@ class TestBudgetTrackerSpendQueries:
# Add record for today
today_ts = datetime.combine(date.today(), datetime.min.time(), tzinfo=UTC).timestamp()
tracker._in_memory.append(
SpendRecord(today_ts + 3600, "test", "model", 0, 0, 1.0, "cloud")
)
tracker._in_memory.append(SpendRecord(today_ts + 3600, "test", "model", 0, 0, 1.0, "cloud"))
# Add old record (2 days ago)
old_ts = (datetime.now(UTC) - timedelta(days=2)).timestamp()
tracker._in_memory.append(
SpendRecord(old_ts, "test", "old_model", 0, 0, 2.0, "cloud")
)
tracker._in_memory.append(SpendRecord(old_ts, "test", "old_model", 0, 0, 2.0, "cloud"))
# Daily should only include today's 1.0
assert tracker.get_daily_spend() == pytest.approx(1.0, abs=1e-9)
@@ -448,9 +452,7 @@ class TestBudgetTrackerInMemoryFallback:
tracker = BudgetTracker(db_path=":memory:")
tracker._db_ok = False
old_ts = (datetime.now(UTC) - timedelta(days=2)).timestamp()
tracker._in_memory.append(
SpendRecord(old_ts, "test", "model", 0, 0, 1.0, "cloud")
)
tracker._in_memory.append(SpendRecord(old_ts, "test", "model", 0, 0, 1.0, "cloud"))
# Query for records in last day
since_ts = (datetime.now(UTC) - timedelta(days=1)).timestamp()
result = tracker._query_spend(since_ts)

View File

@@ -368,12 +368,14 @@ class TestTieredModelRouterClassify:
class TestGetTieredRouterSingleton:
def test_returns_tiered_router_instance(self):
import infrastructure.models.router as rmod
rmod._tiered_router = None
router = get_tiered_router()
assert isinstance(router, TieredModelRouter)
def test_singleton_returns_same_instance(self):
import infrastructure.models.router as rmod
rmod._tiered_router = None
r1 = get_tiered_router()
r2 = get_tiered_router()

View File

@@ -25,9 +25,7 @@ def _pcm_tone(ms: int = 10, sample_rate: int = 48000, amplitude: int = 16000) ->
n = sample_rate * ms // 1000
freq = 440 # Hz
samples = [
int(amplitude * math.sin(2 * math.pi * freq * i / sample_rate)) for i in range(n)
]
samples = [int(amplitude * math.sin(2 * math.pi * freq * i / sample_rate)) for i in range(n)]
return struct.pack(f"<{n}h", *samples)

View File

@@ -23,22 +23,27 @@ def mock_files(tmp_path):
return tmp_path
def test_get_prompt(mock_files):
"""Tests that the prompt is read correctly."""
with patch("scripts.llm_triage.PROMPT_PATH", mock_files / "scripts/deep_triage_prompt.md"):
prompt = get_prompt()
assert prompt == "This is the prompt."
def test_get_context(mock_files):
"""Tests that the context is constructed correctly."""
with patch("scripts.llm_triage.QUEUE_PATH", mock_files / ".loop/queue.json"), \
patch("scripts.llm_triage.SUMMARY_PATH", mock_files / ".loop/retro/summary.json"), \
patch("scripts.llm_triage.RETRO_PATH", mock_files / ".loop/retro/deep-triage.jsonl"):
with (
patch("scripts.llm_triage.QUEUE_PATH", mock_files / ".loop/queue.json"),
patch("scripts.llm_triage.SUMMARY_PATH", mock_files / ".loop/retro/summary.json"),
patch("scripts.llm_triage.RETRO_PATH", mock_files / ".loop/retro/deep-triage.jsonl"),
):
context = get_context()
assert "CURRENT QUEUE (.loop/queue.json):\\n[]" in context
assert "CYCLE SUMMARY (.loop/retro/summary.json):\\n{}" in context
assert "LAST DEEP TRIAGE RETRO:\\n" in context
def test_parse_llm_response():
"""Tests that the LLM's response is parsed correctly."""
response = '{"queue": [1, 2, 3], "retro": {"a": 1}}'
@@ -46,6 +51,7 @@ def test_parse_llm_response():
assert queue == [1, 2, 3]
assert retro == {"a": 1}
@patch("scripts.llm_triage.get_llm_client")
@patch("scripts.llm_triage.GiteaClient")
def test_run_triage(mock_gitea_client, mock_llm_client, mock_files):
@@ -66,11 +72,13 @@ def test_run_triage(mock_gitea_client, mock_llm_client, mock_files):
# Check that the queue and retro files were written
assert (mock_files / ".loop/queue.json").read_text() == '[{"issue": 1}]'
assert (mock_files / ".loop/retro/deep-triage.jsonl").read_text() == '{"issues_closed": [2], "issues_created": [{"title": "New Issue", "body": "This is a new issue."}]}\n'
assert (
(mock_files / ".loop/retro/deep-triage.jsonl").read_text()
== '{"issues_closed": [2], "issues_created": [{"title": "New Issue", "body": "This is a new issue."}]}\n'
)
# Check that the Gitea client was called correctly
mock_gitea_client.return_value.close_issue.assert_called_once_with(2)
mock_gitea_client.return_value.create_issue.assert_called_once_with(
"New Issue", "This is a new issue."
)

View File

@@ -28,6 +28,7 @@ def tmp_spark_db(tmp_path, monkeypatch):
def reset_engine():
"""Ensure the engine singleton is cleared between tests."""
from spark.engine import reset_spark_engine
reset_spark_engine()
yield
reset_spark_engine()
@@ -130,6 +131,7 @@ class TestGetSparkEngineSingleton:
mock_settings.spark_enabled = False
with patch("spark.engine.settings", mock_settings, create=True):
from spark.engine import reset_spark_engine
reset_spark_engine()
# Patch at import time by mocking the config module in engine
import spark.engine as engine_module
@@ -238,6 +240,7 @@ class TestDisabledEngineGuards:
def setup_method(self):
from spark.engine import SparkEngine
self.engine = SparkEngine(enabled=False)
def test_on_task_posted_disabled(self):

View File

@@ -95,18 +95,14 @@ class TestNexusIntrospector:
intro = NexusIntrospector()
intro.record_memory_hits(3)
intro.record_memory_hits(2)
snap = intro.snapshot(
conversation_log=[{"role": "user", "content": "x", "timestamp": "t"}]
)
snap = intro.snapshot(conversation_log=[{"role": "user", "content": "x", "timestamp": "t"}])
assert snap.analytics.memory_hits_total == 5
def test_reset_clears_state(self):
intro = NexusIntrospector()
intro.record_memory_hits(10)
intro.reset()
snap = intro.snapshot(
conversation_log=[{"role": "user", "content": "x", "timestamp": "t"}]
)
snap = intro.snapshot(conversation_log=[{"role": "user", "content": "x", "timestamp": "t"}])
assert snap.analytics.memory_hits_total == 0
def test_topics_deduplication(self):

View File

@@ -89,9 +89,7 @@ class TestSovereigntyPulse:
mock_store = MagicMock()
mock_store.get_snapshot.return_value = mock_snapshot
with patch(
"timmy.sovereignty.metrics.get_metrics_store", return_value=mock_store
):
with patch("timmy.sovereignty.metrics.get_metrics_store", return_value=mock_store):
snap = pulse.snapshot()
# Perception: 8/10 = 80%, Decision: 6/10 = 60%, Narration: 10/10 = 100%
@@ -120,9 +118,7 @@ class TestSovereigntyPulse:
mock_store = MagicMock()
mock_store.get_snapshot.return_value = mock_snapshot
with patch(
"timmy.sovereignty.metrics.get_metrics_store", return_value=mock_store
):
with patch("timmy.sovereignty.metrics.get_metrics_store", return_value=mock_store):
snap = pulse.snapshot()
# Total hits: 15, Total calls: 15, Total: 30
@@ -141,9 +137,7 @@ class TestSovereigntyPulse:
mock_store = MagicMock()
mock_store.get_snapshot.return_value = mock_snapshot
with patch(
"timmy.sovereignty.metrics.get_metrics_store", return_value=mock_store
):
with patch("timmy.sovereignty.metrics.get_metrics_store", return_value=mock_store):
snap = pulse.snapshot()
assert snap.overall_pct == 0.0

View File

@@ -148,9 +148,7 @@ class TestScoreScope:
assert score_meta < score_plain
def test_max_is_three(self):
score = _score_scope(
"Fix it", "See src/foo.py and `def bar()` method here", set()
)
score = _score_scope("Fix it", "See src/foo.py and `def bar()` method here", set())
assert score <= 3
@@ -293,9 +291,7 @@ class TestScoreIssue:
assert issue.is_unassigned is True
def test_blocked_issue_detected(self):
raw = _make_raw_issue(
title="Fix blocked deployment", body="Blocked by infra team."
)
raw = _make_raw_issue(title="Fix blocked deployment", body="Blocked by infra team.")
issue = score_issue(raw)
assert issue.is_blocked is True
@@ -421,9 +417,7 @@ class TestBuildAuditComment:
assert KIMI_READY_LABEL in comment
def test_flag_alex_comment(self):
d = TriageDecision(
issue_number=3, action="flag_alex", agent=OWNER_LOGIN, reason="Blocked"
)
d = TriageDecision(issue_number=3, action="flag_alex", agent=OWNER_LOGIN, reason="Blocked")
comment = _build_audit_comment(d)
assert OWNER_LOGIN in comment
@@ -531,9 +525,7 @@ class TestExecuteDecisionLive:
mock_client = AsyncMock()
mock_client.post.return_value = comment_resp
d = TriageDecision(
issue_number=12, action="flag_alex", agent=OWNER_LOGIN, reason="Blocked"
)
d = TriageDecision(issue_number=12, action="flag_alex", agent=OWNER_LOGIN, reason="Blocked")
with patch("timmy.backlog_triage.settings") as mock_settings:
mock_settings.gitea_token = "tok"
@@ -613,10 +605,7 @@ class TestBacklogTriageLoop:
_make_raw_issue(
number=100,
title="[bug] crash in src/timmy/agent.py",
body=(
"## Problem\nCrashes. Expected: runs. "
"Must pass pytest. Should return 200."
),
body=("## Problem\nCrashes. Expected: runs. Must pass pytest. Should return 200."),
labels=["bug"],
assignees=[],
)

View File

@@ -242,7 +242,9 @@ class TestGetOrCreateLabel:
client = MagicMock()
client.get = AsyncMock(return_value=mock_resp)
result = await _get_or_create_label(client, "http://git", {"Authorization": "token x"}, "owner/repo")
result = await _get_or_create_label(
client, "http://git", {"Authorization": "token x"}, "owner/repo"
)
assert result == 42
@pytest.mark.asyncio
@@ -261,7 +263,9 @@ class TestGetOrCreateLabel:
client.get = AsyncMock(return_value=list_resp)
client.post = AsyncMock(return_value=create_resp)
result = await _get_or_create_label(client, "http://git", {"Authorization": "token x"}, "owner/repo")
result = await _get_or_create_label(
client, "http://git", {"Authorization": "token x"}, "owner/repo"
)
assert result == 99
@pytest.mark.asyncio
@@ -518,7 +522,9 @@ class TestIndexKimiArtifact:
mock_entry = MagicMock()
mock_entry.id = "mem-123"
with patch("timmy.kimi_delegation.asyncio.to_thread", new_callable=AsyncMock) as mock_thread:
with patch(
"timmy.kimi_delegation.asyncio.to_thread", new_callable=AsyncMock
) as mock_thread:
mock_thread.return_value = mock_entry
result = await index_kimi_artifact(42, "My Research", "Some research content here")
@@ -529,7 +535,9 @@ class TestIndexKimiArtifact:
async def test_exception_returns_failure(self):
from timmy.kimi_delegation import index_kimi_artifact
with patch("timmy.kimi_delegation.asyncio.to_thread", new_callable=AsyncMock) as mock_thread:
with patch(
"timmy.kimi_delegation.asyncio.to_thread", new_callable=AsyncMock
) as mock_thread:
mock_thread.side_effect = Exception("DB error")
result = await index_kimi_artifact(42, "title", "some content")
@@ -634,8 +642,15 @@ class TestDelegateResearchToKimi:
"timmy.kimi_delegation.create_kimi_research_issue",
new_callable=AsyncMock,
) as mock_create:
mock_create.return_value = {"success": True, "issue_number": 7, "issue_url": "http://x", "error": None}
result = await delegate_research_to_kimi("Research X", "ctx", "What is X?", priority="high")
mock_create.return_value = {
"success": True,
"issue_number": 7,
"issue_url": "http://x",
"error": None,
}
result = await delegate_research_to_kimi(
"Research X", "ctx", "What is X?", priority="high"
)
assert result["success"] is True
assert result["issue_number"] == 7

View File

@@ -841,11 +841,7 @@ class TestEdgeCases:
def test_metadata_with_nested_structure(self, patched_db):
"""Test storing metadata with nested structure."""
metadata = {
"level1": {
"level2": {
"level3": ["item1", "item2"]
}
},
"level1": {"level2": {"level3": ["item1", "item2"]}},
"number": 42,
"boolean": True,
"null": None,

View File

@@ -43,7 +43,10 @@ class TestVassalCycleRecord:
record.dispatched_to_claude = 3
record.dispatched_to_kimi = 1
record.dispatched_to_timmy = 2
assert record.dispatched_to_claude + record.dispatched_to_kimi + record.dispatched_to_timmy == 6
assert (
record.dispatched_to_claude + record.dispatched_to_kimi + record.dispatched_to_timmy
== 6
)
# ---------------------------------------------------------------------------
@@ -137,10 +140,22 @@ class TestRunCycle:
orch = VassalOrchestrator(cycle_interval=0)
with (
patch("timmy.vassal.orchestration_loop.VassalOrchestrator._step_backlog", new_callable=AsyncMock),
patch("timmy.vassal.orchestration_loop.VassalOrchestrator._step_agent_health", new_callable=AsyncMock),
patch("timmy.vassal.orchestration_loop.VassalOrchestrator._step_house_health", new_callable=AsyncMock),
patch("timmy.vassal.orchestration_loop.VassalOrchestrator._broadcast", new_callable=AsyncMock),
patch(
"timmy.vassal.orchestration_loop.VassalOrchestrator._step_backlog",
new_callable=AsyncMock,
),
patch(
"timmy.vassal.orchestration_loop.VassalOrchestrator._step_agent_health",
new_callable=AsyncMock,
),
patch(
"timmy.vassal.orchestration_loop.VassalOrchestrator._step_house_health",
new_callable=AsyncMock,
),
patch(
"timmy.vassal.orchestration_loop.VassalOrchestrator._broadcast",
new_callable=AsyncMock,
),
):
await orch.run_cycle()
await orch.run_cycle()
@@ -152,10 +167,22 @@ class TestRunCycle:
orch = VassalOrchestrator(cycle_interval=0)
with (
patch("timmy.vassal.orchestration_loop.VassalOrchestrator._step_backlog", new_callable=AsyncMock),
patch("timmy.vassal.orchestration_loop.VassalOrchestrator._step_agent_health", new_callable=AsyncMock),
patch("timmy.vassal.orchestration_loop.VassalOrchestrator._step_house_health", new_callable=AsyncMock),
patch("timmy.vassal.orchestration_loop.VassalOrchestrator._broadcast", new_callable=AsyncMock),
patch(
"timmy.vassal.orchestration_loop.VassalOrchestrator._step_backlog",
new_callable=AsyncMock,
),
patch(
"timmy.vassal.orchestration_loop.VassalOrchestrator._step_agent_health",
new_callable=AsyncMock,
),
patch(
"timmy.vassal.orchestration_loop.VassalOrchestrator._step_house_health",
new_callable=AsyncMock,
),
patch(
"timmy.vassal.orchestration_loop.VassalOrchestrator._broadcast",
new_callable=AsyncMock,
),
):
record = await orch.run_cycle()
@@ -366,7 +393,9 @@ class TestStepHouseHealth:
snapshot.disk = MagicMock()
snapshot.disk.percent_used = 50.0
with patch("timmy.vassal.house_health.get_system_snapshot", AsyncMock(return_value=snapshot)):
with patch(
"timmy.vassal.house_health.get_system_snapshot", AsyncMock(return_value=snapshot)
):
await orch._step_house_health(record)
assert record.house_warnings == ["low disk", "high cpu"]
@@ -384,7 +413,9 @@ class TestStepHouseHealth:
mock_cleanup = AsyncMock(return_value={"deleted_count": 7})
with (
patch("timmy.vassal.house_health.get_system_snapshot", AsyncMock(return_value=snapshot)),
patch(
"timmy.vassal.house_health.get_system_snapshot", AsyncMock(return_value=snapshot)
),
patch("timmy.vassal.house_health.cleanup_stale_files", mock_cleanup),
):
await orch._step_house_health(record)

View File

@@ -38,6 +38,7 @@ from timmy.quest_system import (
# Helpers
# ---------------------------------------------------------------------------
def _make_quest(
quest_id: str = "test_quest",
quest_type: QuestType = QuestType.ISSUE_COUNT,
@@ -77,6 +78,7 @@ def clean_state():
# QuestDefinition
# ---------------------------------------------------------------------------
class TestQuestDefinition:
def test_from_dict_minimal(self):
data = {"id": "q1"}
@@ -123,6 +125,7 @@ class TestQuestDefinition:
# QuestProgress
# ---------------------------------------------------------------------------
class TestQuestProgress:
def test_to_dict_roundtrip(self):
progress = QuestProgress(
@@ -158,6 +161,7 @@ class TestQuestProgress:
# _get_progress_key
# ---------------------------------------------------------------------------
def test_get_progress_key():
assert _get_progress_key("q1", "agent_a") == "agent_a:q1"
@@ -172,6 +176,7 @@ def test_get_progress_key_different_agents():
# load_quest_config
# ---------------------------------------------------------------------------
class TestLoadQuestConfig:
def test_missing_file_returns_empty(self, tmp_path):
missing = tmp_path / "nonexistent.yaml"
@@ -252,6 +257,7 @@ quests:
# get_quest_definitions / get_quest_definition / get_active_quests
# ---------------------------------------------------------------------------
class TestQuestLookup:
def setup_method(self):
q1 = _make_quest("q1", enabled=True)
@@ -282,6 +288,7 @@ class TestQuestLookup:
# _get_target_value
# ---------------------------------------------------------------------------
class TestGetTargetValue:
def test_issue_count(self):
q = _make_quest(quest_type=QuestType.ISSUE_COUNT, criteria={"target_count": 7})
@@ -316,6 +323,7 @@ class TestGetTargetValue:
# get_or_create_progress / get_quest_progress
# ---------------------------------------------------------------------------
class TestProgressCreation:
def setup_method(self):
qs._quest_definitions["q1"] = _make_quest("q1", criteria={"target_count": 5})
@@ -352,6 +360,7 @@ class TestProgressCreation:
# update_quest_progress
# ---------------------------------------------------------------------------
class TestUpdateQuestProgress:
def setup_method(self):
qs._quest_definitions["q1"] = _make_quest("q1", criteria={"target_count": 3})
@@ -398,6 +407,7 @@ class TestUpdateQuestProgress:
# _is_on_cooldown
# ---------------------------------------------------------------------------
class TestIsOnCooldown:
def test_non_repeatable_never_on_cooldown(self):
quest = _make_quest(repeatable=False, cooldown_hours=24)
@@ -466,6 +476,7 @@ class TestIsOnCooldown:
# claim_quest_reward
# ---------------------------------------------------------------------------
class TestClaimQuestReward:
def setup_method(self):
qs._quest_definitions["q1"] = _make_quest("q1", reward_tokens=25)
@@ -553,7 +564,9 @@ class TestClaimQuestReward:
progress.status = QuestStatus.COMPLETED
progress.completed_at = datetime.now(UTC).isoformat()
with patch("timmy.quest_system.create_invoice_entry", side_effect=Exception("ledger error")):
with patch(
"timmy.quest_system.create_invoice_entry", side_effect=Exception("ledger error")
):
result = claim_quest_reward("q1", "agent_a")
assert result is None
@@ -563,10 +576,13 @@ class TestClaimQuestReward:
# check_issue_count_quest
# ---------------------------------------------------------------------------
class TestCheckIssueCountQuest:
def setup_method(self):
qs._quest_definitions["iq"] = _make_quest(
"iq", quest_type=QuestType.ISSUE_COUNT, criteria={"target_count": 2, "issue_labels": ["bug"]}
"iq",
quest_type=QuestType.ISSUE_COUNT,
criteria={"target_count": 2, "issue_labels": ["bug"]},
)
def test_counts_matching_issues(self):
@@ -575,9 +591,7 @@ class TestCheckIssueCountQuest:
{"labels": [{"name": "bug"}, {"name": "priority"}]},
{"labels": [{"name": "feature"}]}, # doesn't match
]
progress = check_issue_count_quest(
qs._quest_definitions["iq"], "agent_a", issues
)
progress = check_issue_count_quest(qs._quest_definitions["iq"], "agent_a", issues)
assert progress.current_value == 2
assert progress.status == QuestStatus.COMPLETED
@@ -604,6 +618,7 @@ class TestCheckIssueCountQuest:
# check_issue_reduce_quest
# ---------------------------------------------------------------------------
class TestCheckIssueReduceQuest:
def setup_method(self):
qs._quest_definitions["ir"] = _make_quest(
@@ -628,6 +643,7 @@ class TestCheckIssueReduceQuest:
# check_daily_run_quest
# ---------------------------------------------------------------------------
class TestCheckDailyRunQuest:
def setup_method(self):
qs._quest_definitions["dr"] = _make_quest(
@@ -649,6 +665,7 @@ class TestCheckDailyRunQuest:
# evaluate_quest_progress
# ---------------------------------------------------------------------------
class TestEvaluateQuestProgress:
def setup_method(self):
qs._quest_definitions["iq"] = _make_quest(
@@ -695,7 +712,13 @@ class TestEvaluateQuestProgress:
assert result is None
def test_cooldown_prevents_evaluation(self):
q = _make_quest("rep_iq", quest_type=QuestType.ISSUE_COUNT, repeatable=True, cooldown_hours=24, criteria={"target_count": 1})
q = _make_quest(
"rep_iq",
quest_type=QuestType.ISSUE_COUNT,
repeatable=True,
cooldown_hours=24,
criteria={"target_count": 1},
)
qs._quest_definitions["rep_iq"] = q
progress = get_or_create_progress("rep_iq", "agent_a")
recent = datetime.now(UTC) - timedelta(hours=1)
@@ -711,6 +734,7 @@ class TestEvaluateQuestProgress:
# reset_quest_progress
# ---------------------------------------------------------------------------
class TestResetQuestProgress:
def setup_method(self):
qs._quest_definitions["q1"] = _make_quest("q1")
@@ -755,6 +779,7 @@ class TestResetQuestProgress:
# get_quest_leaderboard
# ---------------------------------------------------------------------------
class TestGetQuestLeaderboard:
def setup_method(self):
qs._quest_definitions["q1"] = _make_quest("q1", reward_tokens=10)
@@ -798,6 +823,7 @@ class TestGetQuestLeaderboard:
# get_agent_quests_status
# ---------------------------------------------------------------------------
class TestGetAgentQuestsStatus:
def setup_method(self):
qs._quest_definitions["q1"] = _make_quest("q1", reward_tokens=10)

View File

@@ -1,4 +1,4 @@
"""Unit tests for src/timmy/research.py — ResearchOrchestrator pipeline.
"""Unit tests for src/timmy/research/ — ResearchOrchestrator pipeline.
Refs #972 (governing spec), #975 (ResearchOrchestrator).
"""
@@ -22,7 +22,7 @@ class TestListTemplates:
def test_returns_list(self, tmp_path, monkeypatch):
(tmp_path / "tool_evaluation.md").write_text("---\n---\n# T")
(tmp_path / "game_analysis.md").write_text("---\n---\n# G")
monkeypatch.setattr("timmy.research._SKILLS_ROOT", tmp_path)
monkeypatch.setattr("timmy.research.coordinator._SKILLS_ROOT", tmp_path)
from timmy.research import list_templates
@@ -32,7 +32,7 @@ class TestListTemplates:
assert "game_analysis" in result
def test_returns_empty_when_dir_missing(self, tmp_path, monkeypatch):
monkeypatch.setattr("timmy.research._SKILLS_ROOT", tmp_path / "nonexistent")
monkeypatch.setattr("timmy.research.coordinator._SKILLS_ROOT", tmp_path / "nonexistent")
from timmy.research import list_templates
@@ -54,7 +54,7 @@ class TestLoadTemplate:
"tool_evaluation",
"---\nname: Tool Evaluation\ntype: research\n---\n# Tool Eval: {domain}",
)
monkeypatch.setattr("timmy.research._SKILLS_ROOT", tmp_path)
monkeypatch.setattr("timmy.research.coordinator._SKILLS_ROOT", tmp_path)
from timmy.research import load_template
@@ -64,7 +64,7 @@ class TestLoadTemplate:
def test_fills_slots(self, tmp_path, monkeypatch):
self._write_template(tmp_path, "arch", "Connect {system_a} to {system_b}")
monkeypatch.setattr("timmy.research._SKILLS_ROOT", tmp_path)
monkeypatch.setattr("timmy.research.coordinator._SKILLS_ROOT", tmp_path)
from timmy.research import load_template
@@ -74,7 +74,7 @@ class TestLoadTemplate:
def test_unfilled_slots_preserved(self, tmp_path, monkeypatch):
self._write_template(tmp_path, "t", "Hello {name} and {other}")
monkeypatch.setattr("timmy.research._SKILLS_ROOT", tmp_path)
monkeypatch.setattr("timmy.research.coordinator._SKILLS_ROOT", tmp_path)
from timmy.research import load_template
@@ -82,7 +82,7 @@ class TestLoadTemplate:
assert "{other}" in result
def test_raises_file_not_found_for_missing_template(self, tmp_path, monkeypatch):
monkeypatch.setattr("timmy.research._SKILLS_ROOT", tmp_path)
monkeypatch.setattr("timmy.research.coordinator._SKILLS_ROOT", tmp_path)
from timmy.research import load_template
@@ -91,7 +91,7 @@ class TestLoadTemplate:
def test_no_slots_returns_raw_body(self, tmp_path, monkeypatch):
self._write_template(tmp_path, "plain", "---\n---\nJust text here")
monkeypatch.setattr("timmy.research._SKILLS_ROOT", tmp_path)
monkeypatch.setattr("timmy.research.coordinator._SKILLS_ROOT", tmp_path)
from timmy.research import load_template
@@ -109,7 +109,7 @@ class TestCheckCache:
mock_mem = MagicMock()
mock_mem.search.return_value = []
with patch("timmy.research.SemanticMemory", return_value=mock_mem):
with patch("timmy.research.coordinator.SemanticMemory", return_value=mock_mem):
from timmy.research import _check_cache
content, score = _check_cache("some topic")
@@ -121,7 +121,7 @@ class TestCheckCache:
mock_mem = MagicMock()
mock_mem.search.return_value = [("cached report text", 0.91)]
with patch("timmy.research.SemanticMemory", return_value=mock_mem):
with patch("timmy.research.coordinator.SemanticMemory", return_value=mock_mem):
from timmy.research import _check_cache
content, score = _check_cache("same topic")
@@ -133,7 +133,7 @@ class TestCheckCache:
mock_mem = MagicMock()
mock_mem.search.return_value = [("old report", 0.60)]
with patch("timmy.research.SemanticMemory", return_value=mock_mem):
with patch("timmy.research.coordinator.SemanticMemory", return_value=mock_mem):
from timmy.research import _check_cache
content, score = _check_cache("slightly different topic")
@@ -142,7 +142,7 @@ class TestCheckCache:
assert score == 0.0
def test_degrades_gracefully_on_import_error(self):
with patch("timmy.research.SemanticMemory", None):
with patch("timmy.research.coordinator.SemanticMemory", None):
from timmy.research import _check_cache
content, score = _check_cache("topic")
@@ -160,7 +160,7 @@ class TestStoreResult:
def test_calls_store_memory(self):
mock_store = MagicMock()
with patch("timmy.research.store_memory", mock_store):
with patch("timmy.research.coordinator.store_memory", mock_store):
from timmy.research import _store_result
_store_result("test topic", "# Report\n\nContent here.")
@@ -171,7 +171,7 @@ class TestStoreResult:
def test_degrades_gracefully_on_error(self):
mock_store = MagicMock(side_effect=RuntimeError("db error"))
with patch("timmy.research.store_memory", mock_store):
with patch("timmy.research.coordinator.store_memory", mock_store):
from timmy.research import _store_result
# Should not raise
@@ -185,7 +185,7 @@ class TestStoreResult:
class TestSaveToDisk:
def test_writes_file(self, tmp_path, monkeypatch):
monkeypatch.setattr("timmy.research._DOCS_ROOT", tmp_path / "research")
monkeypatch.setattr("timmy.research.coordinator._DOCS_ROOT", tmp_path / "research")
from timmy.research import _save_to_disk
@@ -195,7 +195,7 @@ class TestSaveToDisk:
assert path.read_text() == "# Test Report"
def test_slugifies_topic_name(self, tmp_path, monkeypatch):
monkeypatch.setattr("timmy.research._DOCS_ROOT", tmp_path / "research")
monkeypatch.setattr("timmy.research.coordinator._DOCS_ROOT", tmp_path / "research")
from timmy.research import _save_to_disk
@@ -207,7 +207,7 @@ class TestSaveToDisk:
def test_returns_none_on_error(self, monkeypatch):
monkeypatch.setattr(
"timmy.research._DOCS_ROOT",
"timmy.research.coordinator._DOCS_ROOT",
Path("/nonexistent_root/deeply/nested"),
)
@@ -229,7 +229,7 @@ class TestRunResearch:
async def test_returns_cached_result_when_cache_hit(self):
cached_report = "# Cached Report\n\nPreviously computed."
with (
patch("timmy.research._check_cache", return_value=(cached_report, 0.93)),
patch("timmy.research.coordinator._check_cache", return_value=(cached_report, 0.93)),
):
from timmy.research import run_research
@@ -242,21 +242,23 @@ class TestRunResearch:
@pytest.mark.asyncio
async def test_skips_cache_when_requested(self, tmp_path, monkeypatch):
monkeypatch.setattr("timmy.research._SKILLS_ROOT", tmp_path)
monkeypatch.setattr("timmy.research.coordinator._SKILLS_ROOT", tmp_path)
with (
patch("timmy.research._check_cache", return_value=("cached", 0.99)) as mock_cache,
patch(
"timmy.research._formulate_queries",
"timmy.research.coordinator._check_cache", return_value=("cached", 0.99)
) as mock_cache,
patch(
"timmy.research.sources._formulate_queries",
new=AsyncMock(return_value=["q1"]),
),
patch("timmy.research._execute_search", new=AsyncMock(return_value=[])),
patch("timmy.research._fetch_pages", new=AsyncMock(return_value=[])),
patch("timmy.research.sources._execute_search", new=AsyncMock(return_value=[])),
patch("timmy.research.sources._fetch_pages", new=AsyncMock(return_value=[])),
patch(
"timmy.research._synthesize",
"timmy.research.sources._synthesize",
new=AsyncMock(return_value=("# Fresh report", "ollama")),
),
patch("timmy.research._store_result"),
patch("timmy.research.coordinator._store_result"),
):
from timmy.research import run_research
@@ -268,21 +270,21 @@ class TestRunResearch:
@pytest.mark.asyncio
async def test_full_pipeline_no_search_results(self, tmp_path, monkeypatch):
monkeypatch.setattr("timmy.research._SKILLS_ROOT", tmp_path)
monkeypatch.setattr("timmy.research.coordinator._SKILLS_ROOT", tmp_path)
with (
patch("timmy.research._check_cache", return_value=(None, 0.0)),
patch("timmy.research.coordinator._check_cache", return_value=(None, 0.0)),
patch(
"timmy.research._formulate_queries",
"timmy.research.sources._formulate_queries",
new=AsyncMock(return_value=["query 1", "query 2"]),
),
patch("timmy.research._execute_search", new=AsyncMock(return_value=[])),
patch("timmy.research._fetch_pages", new=AsyncMock(return_value=[])),
patch("timmy.research.sources._execute_search", new=AsyncMock(return_value=[])),
patch("timmy.research.sources._fetch_pages", new=AsyncMock(return_value=[])),
patch(
"timmy.research._synthesize",
"timmy.research.sources._synthesize",
new=AsyncMock(return_value=("# Report", "ollama")),
),
patch("timmy.research._store_result"),
patch("timmy.research.coordinator._store_result"),
):
from timmy.research import run_research
@@ -296,21 +298,21 @@ class TestRunResearch:
@pytest.mark.asyncio
async def test_returns_result_with_error_on_bad_template(self, tmp_path, monkeypatch):
monkeypatch.setattr("timmy.research._SKILLS_ROOT", tmp_path)
monkeypatch.setattr("timmy.research.coordinator._SKILLS_ROOT", tmp_path)
with (
patch("timmy.research._check_cache", return_value=(None, 0.0)),
patch("timmy.research.coordinator._check_cache", return_value=(None, 0.0)),
patch(
"timmy.research._formulate_queries",
"timmy.research.sources._formulate_queries",
new=AsyncMock(return_value=["q1"]),
),
patch("timmy.research._execute_search", new=AsyncMock(return_value=[])),
patch("timmy.research._fetch_pages", new=AsyncMock(return_value=[])),
patch("timmy.research.sources._execute_search", new=AsyncMock(return_value=[])),
patch("timmy.research.sources._fetch_pages", new=AsyncMock(return_value=[])),
patch(
"timmy.research._synthesize",
"timmy.research.sources._synthesize",
new=AsyncMock(return_value=("# Report", "ollama")),
),
patch("timmy.research._store_result"),
patch("timmy.research.coordinator._store_result"),
):
from timmy.research import run_research
@@ -321,22 +323,22 @@ class TestRunResearch:
@pytest.mark.asyncio
async def test_saves_to_disk_when_requested(self, tmp_path, monkeypatch):
monkeypatch.setattr("timmy.research._SKILLS_ROOT", tmp_path)
monkeypatch.setattr("timmy.research._DOCS_ROOT", tmp_path / "research")
monkeypatch.setattr("timmy.research.coordinator._SKILLS_ROOT", tmp_path)
monkeypatch.setattr("timmy.research.coordinator._DOCS_ROOT", tmp_path / "research")
with (
patch("timmy.research._check_cache", return_value=(None, 0.0)),
patch("timmy.research.coordinator._check_cache", return_value=(None, 0.0)),
patch(
"timmy.research._formulate_queries",
"timmy.research.sources._formulate_queries",
new=AsyncMock(return_value=["q1"]),
),
patch("timmy.research._execute_search", new=AsyncMock(return_value=[])),
patch("timmy.research._fetch_pages", new=AsyncMock(return_value=[])),
patch("timmy.research.sources._execute_search", new=AsyncMock(return_value=[])),
patch("timmy.research.sources._fetch_pages", new=AsyncMock(return_value=[])),
patch(
"timmy.research._synthesize",
"timmy.research.sources._synthesize",
new=AsyncMock(return_value=("# Saved Report", "ollama")),
),
patch("timmy.research._store_result"),
patch("timmy.research.coordinator._store_result"),
):
from timmy.research import run_research
@@ -349,21 +351,21 @@ class TestRunResearch:
@pytest.mark.asyncio
async def test_result_is_not_empty_after_synthesis(self, tmp_path, monkeypatch):
monkeypatch.setattr("timmy.research._SKILLS_ROOT", tmp_path)
monkeypatch.setattr("timmy.research.coordinator._SKILLS_ROOT", tmp_path)
with (
patch("timmy.research._check_cache", return_value=(None, 0.0)),
patch("timmy.research.coordinator._check_cache", return_value=(None, 0.0)),
patch(
"timmy.research._formulate_queries",
"timmy.research.sources._formulate_queries",
new=AsyncMock(return_value=["q"]),
),
patch("timmy.research._execute_search", new=AsyncMock(return_value=[])),
patch("timmy.research._fetch_pages", new=AsyncMock(return_value=[])),
patch("timmy.research.sources._execute_search", new=AsyncMock(return_value=[])),
patch("timmy.research.sources._fetch_pages", new=AsyncMock(return_value=[])),
patch(
"timmy.research._synthesize",
"timmy.research.sources._synthesize",
new=AsyncMock(return_value=("# Non-empty", "ollama")),
),
patch("timmy.research._store_result"),
patch("timmy.research.coordinator._store_result"),
):
from timmy.research import run_research

View File

@@ -40,9 +40,7 @@ class TestGoogleWebSearch:
with patch("timmy.research_tools.GoogleSearch", mock_search_cls):
result = await google_web_search("python tutorial")
mock_search_cls.assert_called_once_with(
{"q": "python tutorial", "api_key": "test-key-123"}
)
mock_search_cls.assert_called_once_with({"q": "python tutorial", "api_key": "test-key-123"})
assert "Hello" in result
@pytest.mark.asyncio

View File

@@ -175,7 +175,7 @@ class TestGatherSovereigntyData:
delta = data["deltas"].get("cache_hit_rate")
assert delta is not None
assert delta["start"] == 0.1 # oldest in window
assert delta["end"] == 0.5 # most recent
assert delta["end"] == 0.5 # most recent
assert data["previous_session"]["cache_hit_rate"] == 0.3
def test_single_data_point_no_delta(self):
@@ -334,7 +334,9 @@ class TestCommitReport:
assert result is True
mock_client.put.assert_called_once()
call_kwargs = mock_client.put.call_args
payload = call_kwargs.kwargs.get("json", call_kwargs.args[1] if len(call_kwargs.args) > 1 else {})
payload = call_kwargs.kwargs.get(
"json", call_kwargs.args[1] if len(call_kwargs.args) > 1 else {}
)
decoded = base64.b64decode(payload["content"]).decode()
assert "# report content" in decoded

View File

@@ -224,9 +224,11 @@ class TestConsultGrok:
mock_settings = MagicMock()
mock_settings.grok_free = True
with patch("timmy.backends.grok_available", return_value=True), \
patch("timmy.backends.get_grok_backend", return_value=mock_backend), \
patch("config.settings", mock_settings):
with (
patch("timmy.backends.grok_available", return_value=True),
patch("timmy.backends.get_grok_backend", return_value=mock_backend),
patch("config.settings", mock_settings),
):
result = consult_grok("What is 2+2?")
assert result == "Answer text"
@@ -240,10 +242,12 @@ class TestConsultGrok:
mock_settings = MagicMock()
mock_settings.grok_free = True
with patch("timmy.backends.grok_available", return_value=True), \
patch("timmy.backends.get_grok_backend", return_value=mock_backend), \
patch("config.settings", mock_settings), \
patch.dict("sys.modules", {"spark.engine": None}):
with (
patch("timmy.backends.grok_available", return_value=True),
patch("timmy.backends.get_grok_backend", return_value=mock_backend),
patch("config.settings", mock_settings),
patch.dict("sys.modules", {"spark.engine": None}),
):
result = consult_grok("hello")
assert result == "ok"
@@ -262,10 +266,12 @@ class TestConsultGrok:
mock_ln_backend.create_invoice.side_effect = OSError("LN down")
mock_lightning.get_backend.return_value = mock_ln_backend
with patch("timmy.backends.grok_available", return_value=True), \
patch("timmy.backends.get_grok_backend", return_value=mock_backend), \
patch("config.settings", mock_settings), \
patch.dict("sys.modules", {"lightning.factory": mock_lightning}):
with (
patch("timmy.backends.grok_available", return_value=True),
patch("timmy.backends.get_grok_backend", return_value=mock_backend),
patch("config.settings", mock_settings),
patch.dict("sys.modules", {"lightning.factory": mock_lightning}),
):
result = consult_grok("expensive query")
assert "Error" in result
@@ -313,7 +319,9 @@ class TestWebFetch:
mock_requests.exceptions = _make_request_exceptions()
mock_trafilatura.extract.return_value = None
with patch.dict("sys.modules", {"requests": mock_requests, "trafilatura": mock_trafilatura}):
with patch.dict(
"sys.modules", {"requests": mock_requests, "trafilatura": mock_trafilatura}
):
result = web_fetch("https://example.com")
assert "Error: could not extract" in result
@@ -329,7 +337,9 @@ class TestWebFetch:
mock_requests.exceptions = _make_request_exceptions()
mock_trafilatura.extract.return_value = long_text
with patch.dict("sys.modules", {"requests": mock_requests, "trafilatura": mock_trafilatura}):
with patch.dict(
"sys.modules", {"requests": mock_requests, "trafilatura": mock_trafilatura}
):
result = web_fetch("https://example.com", max_tokens=100)
assert "[…truncated" in result
@@ -345,7 +355,9 @@ class TestWebFetch:
mock_requests.exceptions = _make_request_exceptions()
mock_trafilatura.extract.return_value = "Hello"
with patch.dict("sys.modules", {"requests": mock_requests, "trafilatura": mock_trafilatura}):
with patch.dict(
"sys.modules", {"requests": mock_requests, "trafilatura": mock_trafilatura}
):
result = web_fetch("https://example.com")
assert result == "Hello"
@@ -358,7 +370,9 @@ class TestWebFetch:
mock_requests.get.side_effect = exc_mod.Timeout("timed out")
mock_trafilatura = MagicMock()
with patch.dict("sys.modules", {"requests": mock_requests, "trafilatura": mock_trafilatura}):
with patch.dict(
"sys.modules", {"requests": mock_requests, "trafilatura": mock_trafilatura}
):
result = web_fetch("https://example.com")
assert "timed out" in result
@@ -375,7 +389,9 @@ class TestWebFetch:
)
mock_trafilatura = MagicMock()
with patch.dict("sys.modules", {"requests": mock_requests, "trafilatura": mock_trafilatura}):
with patch.dict(
"sys.modules", {"requests": mock_requests, "trafilatura": mock_trafilatura}
):
result = web_fetch("https://example.com/nope")
assert "404" in result
@@ -388,7 +404,9 @@ class TestWebFetch:
mock_requests.get.side_effect = exc_mod.RequestException("connection refused")
mock_trafilatura = MagicMock()
with patch.dict("sys.modules", {"requests": mock_requests, "trafilatura": mock_trafilatura}):
with patch.dict(
"sys.modules", {"requests": mock_requests, "trafilatura": mock_trafilatura}
):
result = web_fetch("https://example.com")
assert "Error" in result
@@ -404,7 +422,9 @@ class TestWebFetch:
mock_requests.exceptions = _make_request_exceptions()
mock_trafilatura.extract.return_value = "content"
with patch.dict("sys.modules", {"requests": mock_requests, "trafilatura": mock_trafilatura}):
with patch.dict(
"sys.modules", {"requests": mock_requests, "trafilatura": mock_trafilatura}
):
result = web_fetch("http://example.com")
assert result == "content"

View File

@@ -178,9 +178,7 @@ class TestScrapeUrl:
def test_sync_result_returned_immediately(self):
"""If Crawl4AI returns results in the POST response, use them directly."""
mock_data = {
"results": [{"markdown": "# Hello\n\nThis is the page content."}]
}
mock_data = {"results": [{"markdown": "# Hello\n\nThis is the page content."}]}
mock_req = _mock_requests(json_response=mock_data)
with patch.dict("sys.modules", {"requests": mock_req}):
with patch("timmy.tools.search.settings") as mock_settings:

View File

@@ -20,32 +20,36 @@ class TestIsAppleSilicon:
def test_returns_true_on_arm64_darwin(self):
from timmy.backends import is_apple_silicon
with patch("platform.system", return_value="Darwin"), patch(
"platform.machine", return_value="arm64"
with (
patch("platform.system", return_value="Darwin"),
patch("platform.machine", return_value="arm64"),
):
assert is_apple_silicon() is True
def test_returns_false_on_intel_mac(self):
from timmy.backends import is_apple_silicon
with patch("platform.system", return_value="Darwin"), patch(
"platform.machine", return_value="x86_64"
with (
patch("platform.system", return_value="Darwin"),
patch("platform.machine", return_value="x86_64"),
):
assert is_apple_silicon() is False
def test_returns_false_on_linux(self):
from timmy.backends import is_apple_silicon
with patch("platform.system", return_value="Linux"), patch(
"platform.machine", return_value="x86_64"
with (
patch("platform.system", return_value="Linux"),
patch("platform.machine", return_value="x86_64"),
):
assert is_apple_silicon() is False
def test_returns_false_on_windows(self):
from timmy.backends import is_apple_silicon
with patch("platform.system", return_value="Windows"), patch(
"platform.machine", return_value="AMD64"
with (
patch("platform.system", return_value="Windows"),
patch("platform.machine", return_value="AMD64"),
):
assert is_apple_silicon() is False
@@ -96,7 +100,9 @@ class TestAirLLMGracefulDegradation:
raise ImportError("No module named 'airllm'")
return original_import(name, *args, **kwargs)
original_import = __builtins__["__import__"] if isinstance(__builtins__, dict) else __import__
original_import = (
__builtins__["__import__"] if isinstance(__builtins__, dict) else __import__
)
with (
patch("timmy.backends.is_apple_silicon", return_value=True),

View File

@@ -197,9 +197,7 @@ class TestExtractClip:
@pytest.mark.asyncio
async def test_uses_default_highlight_id_when_missing(self):
with patch("content.extraction.clipper._ffmpeg_available", return_value=False):
result = await extract_clip(
{"source_path": "/a.mp4", "start_time": 0, "end_time": 5}
)
result = await extract_clip({"source_path": "/a.mp4", "start_time": 0, "end_time": 5})
assert result.highlight_id == "unknown"

View File

@@ -22,7 +22,9 @@ class TestSha256File:
result = _sha256_file(str(f))
assert isinstance(result, str)
assert len(result) == 64 # SHA-256 hex is 64 chars
assert result == "b94d27b9934d3e08a52e52d7da7dabfac484efe04294e576b4b4857ad9c2f37"[0:0] or True
assert (
result == "b94d27b9934d3e08a52e52d7da7dabfac484efe04294e576b4b4857ad9c2f37"[0:0] or True
)
def test_consistent_for_same_content(self, tmp_path):
f = tmp_path / "test.bin"
@@ -51,9 +53,7 @@ class TestSha256File:
class TestPublishEpisode:
@pytest.mark.asyncio
async def test_returns_failure_when_video_missing(self, tmp_path):
result = await publish_episode(
str(tmp_path / "nonexistent.mp4"), "Title"
)
result = await publish_episode(str(tmp_path / "nonexistent.mp4"), "Title")
assert result.success is False
assert "not found" in result.error

View File

@@ -42,11 +42,7 @@ def test_model_size_unknown_returns_default(monitor):
def test_read_battery_watts_on_battery(monitor):
ioreg_output = (
"{\n"
' "InstantAmperage" = 2500\n'
' "Voltage" = 12000\n'
' "ExternalConnected" = No\n'
"}"
'{\n "InstantAmperage" = 2500\n "Voltage" = 12000\n "ExternalConnected" = No\n}'
)
mock_result = MagicMock()
mock_result.stdout = ioreg_output
@@ -60,11 +56,7 @@ def test_read_battery_watts_on_battery(monitor):
def test_read_battery_watts_plugged_in_returns_zero(monitor):
ioreg_output = (
"{\n"
' "InstantAmperage" = 1000\n'
' "Voltage" = 12000\n'
' "ExternalConnected" = Yes\n'
"}"
'{\n "InstantAmperage" = 1000\n "Voltage" = 12000\n "ExternalConnected" = Yes\n}'
)
mock_result = MagicMock()
mock_result.stdout = ioreg_output
@@ -85,10 +77,7 @@ def test_read_battery_watts_subprocess_failure_raises(monitor):
def test_read_cpu_pct_parses_top(monitor):
top_output = (
"Processes: 450 total\n"
"CPU usage: 15.2% user, 8.8% sys, 76.0% idle\n"
)
top_output = "Processes: 450 total\nCPU usage: 15.2% user, 8.8% sys, 76.0% idle\n"
mock_result = MagicMock()
mock_result.stdout = top_output

View File

@@ -516,9 +516,7 @@ class TestCountActiveKimiIssues:
resp.json.return_value = []
mock_client.get.return_value = resp
await _count_active_kimi_issues(
mock_client, "http://gitea.local/api/v1", {}, "owner/repo"
)
await _count_active_kimi_issues(mock_client, "http://gitea.local/api/v1", {}, "owner/repo")
call_kwargs = mock_client.get.call_args.kwargs
assert call_kwargs["params"]["state"] == "open"
assert call_kwargs["params"]["labels"] == KIMI_READY_LABEL
@@ -557,9 +555,7 @@ class TestKimiCapEnforcement:
async def test_cap_reached_returns_failure(self):
from timmy.kimi_delegation import create_kimi_research_issue
async_ctx = self._make_async_client(
[{"name": "kimi-ready", "id": 7}], issue_count=3
)
async_ctx = self._make_async_client([{"name": "kimi-ready", "id": 7}], issue_count=3)
with (
patch("config.settings", self._make_settings()),
@@ -575,9 +571,7 @@ class TestKimiCapEnforcement:
async def test_cap_exceeded_returns_failure(self):
from timmy.kimi_delegation import create_kimi_research_issue
async_ctx = self._make_async_client(
[{"name": "kimi-ready", "id": 7}], issue_count=5
)
async_ctx = self._make_async_client([{"name": "kimi-ready", "id": 7}], issue_count=5)
with (
patch("config.settings", self._make_settings()),

View File

@@ -77,7 +77,7 @@ class TestSchnorrVerify:
kp = generate_keypair()
msg = b"\x00" * 32
sig = schnorr_sign(msg, kp.privkey_bytes)
bad_msg = b"\xFF" * 32
bad_msg = b"\xff" * 32
assert schnorr_verify(bad_msg, kp.pubkey_bytes, sig) is False
def test_wrong_lengths_return_false(self):

View File

@@ -1,6 +1,5 @@
"""Unit tests for infrastructure.self_correction."""
import pytest
# ---------------------------------------------------------------------------
@@ -192,14 +191,22 @@ class TestGetPatterns:
from infrastructure.self_correction import get_patterns, log_self_correction
log_self_correction(
source="test", original_intent="i", detected_error="e",
correction_strategy="s", final_outcome="o",
error_type="Foo", outcome_status="success",
source="test",
original_intent="i",
detected_error="e",
correction_strategy="s",
final_outcome="o",
error_type="Foo",
outcome_status="success",
)
log_self_correction(
source="test", original_intent="i", detected_error="e",
correction_strategy="s", final_outcome="o",
error_type="Foo", outcome_status="failed",
source="test",
original_intent="i",
detected_error="e",
correction_strategy="s",
final_outcome="o",
error_type="Foo",
outcome_status="failed",
)
patterns = get_patterns(top_n=5)
foo = next(p for p in patterns if p["error_type"] == "Foo")
@@ -211,13 +218,21 @@ class TestGetPatterns:
for _ in range(2):
log_self_correction(
source="t", original_intent="i", detected_error="e",
correction_strategy="s", final_outcome="o", error_type="Rare",
source="t",
original_intent="i",
detected_error="e",
correction_strategy="s",
final_outcome="o",
error_type="Rare",
)
for _ in range(5):
log_self_correction(
source="t", original_intent="i", detected_error="e",
correction_strategy="s", final_outcome="o", error_type="Common",
source="t",
original_intent="i",
detected_error="e",
correction_strategy="s",
final_outcome="o",
error_type="Common",
)
patterns = get_patterns(top_n=5)
assert patterns[0]["error_type"] == "Common"
@@ -240,12 +255,20 @@ class TestGetStats:
from infrastructure.self_correction import get_stats, log_self_correction
log_self_correction(
source="t", original_intent="i", detected_error="e",
correction_strategy="s", final_outcome="o", outcome_status="success",
source="t",
original_intent="i",
detected_error="e",
correction_strategy="s",
final_outcome="o",
outcome_status="success",
)
log_self_correction(
source="t", original_intent="i", detected_error="e",
correction_strategy="s", final_outcome="o", outcome_status="failed",
source="t",
original_intent="i",
detected_error="e",
correction_strategy="s",
final_outcome="o",
outcome_status="failed",
)
stats = get_stats()
assert stats["total"] == 2
@@ -258,8 +281,12 @@ class TestGetStats:
for _ in range(4):
log_self_correction(
source="t", original_intent="i", detected_error="e",
correction_strategy="s", final_outcome="o", outcome_status="success",
source="t",
original_intent="i",
detected_error="e",
correction_strategy="s",
final_outcome="o",
outcome_status="success",
)
stats = get_stats()
assert stats["success_rate"] == 100