1
0

Compare commits

..

3 Commits

Author SHA1 Message Date
Alexander Whitestone
13212b92b5 fix: auto-update Working RAM timestamp from DB activity
HotMemory.read() now includes a '> Last updated:' header showing the
created_at timestamp of the most recent memory in the database. This
means the Working RAM display automatically reflects the last inference
session rather than persisting the static timestamp from MEMORY.md.

- Add recall_last_activity_time() to timmy.memory.crud
- Update HotMemory.read() to include timestamp when DB has content
- Export recall_last_activity_time from timmy.memory_system
- Add TestHotMemoryTimestamp test class (3 tests)

Fixes #1351

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-23 22:57:36 -04:00
9e8e0f8552 [claude] Placeholder research artifact for issue #1341 (#1350) 2026-03-24 02:49:37 +00:00
e09082a8a8 [kimi] Add comprehensive unit tests for models/budget.py (#1316) (#1347) 2026-03-24 02:48:51 +00:00
19 changed files with 868 additions and 35 deletions

35
memory/research/task.md Normal file
View File

@@ -0,0 +1,35 @@
# Research Report: Task #1341
**Date:** 2026-03-23
**Issue:** [#1341](http://143.198.27.163:3000/Rockachopa/Timmy-time-dashboard/issues/1341)
**Priority:** normal
**Delegated by:** Timmy via Kimi delegation pipeline
---
## Summary
This issue was submitted as a placeholder via the Kimi delegation pipeline with unfilled template fields:
- **Research Question:** `Q?` (template default — no actual question provided)
- **Background / Context:** `ctx` (template default — no context provided)
- **Task:** `Task` (template default — no task specified)
## Findings
No actionable research question was specified. The issue appears to be a test or
accidental submission of an unfilled delegation template.
## Recommendations
1. **Re-open with a real question** if there is a specific topic to research.
2. **Review the delegation pipeline** to add validation that prevents empty/template-default
submissions from reaching the backlog (e.g. reject issues where the body contains
literal placeholder strings like `Q?` or `ctx`).
3. **Add a pipeline guard** in the Kimi delegation script to require non-empty, non-default
values for `Research Question` and `Background / Context` before creating an issue.
## Next Steps
- [ ] Add input validation to Kimi delegation pipeline
- [ ] Re-file with a concrete research question if needed

View File

@@ -167,6 +167,3 @@ directory = "htmlcov"
[tool.coverage.xml]
output = "coverage.xml"
[tool.mypy]
mypy_path = "src"

View File

@@ -56,6 +56,13 @@ class Settings(BaseSettings):
# Set to 0 to use model defaults.
ollama_num_ctx: int = 32768
# Maximum models loaded simultaneously in Ollama — override with OLLAMA_MAX_LOADED_MODELS
# Set to 2 so Qwen3-8B and Qwen3-14B can stay hot concurrently (~17 GB combined).
# Requires Ollama ≥ 0.1.33. Export this to the Ollama process environment:
# OLLAMA_MAX_LOADED_MODELS=2 ollama serve
# or add it to your systemd/launchd unit before starting the harness.
ollama_max_loaded_models: int = 2
# Fallback model chains — override with FALLBACK_MODELS / VISION_FALLBACK_MODELS
# as comma-separated strings, e.g. FALLBACK_MODELS="qwen3:8b,qwen2.5:14b"
# Or edit config/providers.yaml → fallback_chains for the canonical source.

View File

@@ -13,9 +13,9 @@ from timmy.tools import get_all_available_tools
router = APIRouter(tags=["tools"])
_AgentView = namedtuple("_AgentView", ["name", "status", "tools", "stats"])
_ToolView = namedtuple("_ToolView", ["name", "description"])
_Stats = namedtuple("_Stats", ["total_calls"])
_AgentView = namedtuple("AgentView", ["name", "status", "tools", "stats"])
_ToolView = namedtuple("ToolView", ["name", "description"])
_Stats = namedtuple("Stats", ["total_calls"])
def _build_agent_tools():

View File

@@ -48,7 +48,7 @@ def _get_familiar_state() -> dict:
BARK_STYLES = {"speech", "thought", "whisper", "shout"}
def produce_bark(agent_id: str, text: str, reply_to: str | None = None, style: str = "speech") -> dict:
def produce_bark(agent_id: str, text: str, reply_to: str = None, style: str = "speech") -> dict:
"""Format a chat response as a Matrix bark message.
Barks appear as floating text above agents in the Matrix 3D world with
@@ -102,7 +102,7 @@ def produce_bark(agent_id: str, text: str, reply_to: str | None = None, style: s
def produce_thought(
agent_id: str, thought_text: str, thought_id: int, chain_id: str | None = None
agent_id: str, thought_text: str, thought_id: int, chain_id: str = None
) -> dict:
"""Format a thinking engine thought as a Matrix thought message.

View File

@@ -70,12 +70,12 @@ class SovereigntyAlert:
# Graduation targets from issue #981
GRADUATION_TARGETS: dict[str, dict[str, float]] = {
GRADUATION_TARGETS = {
"cache_hit_rate": {"week1": 0.10, "month1": 0.40, "month3": 0.80, "graduation": 0.90},
"api_cost": {"week1": 1.50, "month1": 0.50, "month3": 0.10, "graduation": 0.01},
"time_to_report": {"week1": 180.0, "month1": 30.0, "month3": 5.0, "graduation": 1.0},
"human_involvement": {"week1": 1.0, "month1": 0.5, "month3": 0.25, "graduation": 0.0},
"local_artifacts": {"week1": 6.0, "month1": 30.0, "month3": 100.0, "graduation": 500.0},
"local_artifacts": {"week1": 6, "month1": 30, "month3": 100, "graduation": 500},
}

View File

@@ -46,13 +46,12 @@ class VisitorRegistry:
"""
_instance: "VisitorRegistry | None" = None
_visitors: dict[str, VisitorState]
def __new__(cls) -> "VisitorRegistry":
"""Singleton constructor."""
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._visitors = {}
cls._instance._visitors: dict[str, VisitorState] = {}
return cls._instance
def add(

View File

@@ -123,8 +123,10 @@ class RecoveryManager:
if snapshot_id is None:
snap_data = history[0] # most recent
else:
matches = [s for s in history if s["snapshot_id"] == snapshot_id]
snap_data = matches[0] if matches else None
snap_data = next(
(s for s in history if s["snapshot_id"] == snapshot_id),
None,
)
if snap_data is None:
logger.warning("RecoveryManager: snapshot %s not found", snapshot_id)

View File

@@ -177,6 +177,35 @@ def think(
timmy.print_response(f"Think carefully about: {topic}", stream=True, session_id=_CLI_SESSION_ID)
def _read_message_input(message: list[str]) -> str:
"""Join CLI arguments and read from stdin when appropriate."""
message_str = " ".join(message)
if message_str == "-" or not _is_interactive():
try:
stdin_content = sys.stdin.read().strip()
except (KeyboardInterrupt, EOFError):
stdin_content = ""
if stdin_content:
message_str = stdin_content
elif message_str == "-":
typer.echo("No input provided via stdin.", err=True)
raise typer.Exit(1)
return message_str
def _resolve_session_id(session_id: str | None, new_session: bool) -> str:
"""Return the effective session ID based on CLI flags."""
import uuid
if session_id is not None:
return session_id
if new_session:
return str(uuid.uuid4())
return _CLI_SESSION_ID
@app.command()
def chat(
message: list[str] = typer.Argument(

View File

@@ -9,7 +9,7 @@ import re
from datetime import UTC, datetime
from pathlib import Path
from timmy.memory.crud import recall_last_reflection, recall_personal_facts
from timmy.memory.crud import recall_last_activity_time, recall_last_reflection, recall_personal_facts
from timmy.memory.db import HOT_MEMORY_PATH, VAULT_PATH
logger = logging.getLogger(__name__)
@@ -89,25 +89,41 @@ class HotMemory:
"""Read hot memory — computed view of top facts + last reflection from DB."""
try:
facts = recall_personal_facts()
lines = ["# Timmy Hot Memory\n"]
if facts:
lines.append("## Known Facts\n")
for f in facts[:15]:
lines.append(f"- {f}")
# Include the last reflection if available
reflection = recall_last_reflection()
if reflection:
lines.append("\n## Last Reflection\n")
lines.append(reflection)
if len(lines) > 1:
if facts or reflection:
last_ts = recall_last_activity_time()
try:
updated_date = datetime.fromisoformat(last_ts).strftime("%Y-%m-%d %H:%M UTC")
except (TypeError, ValueError):
updated_date = datetime.now(UTC).strftime("%Y-%m-%d %H:%M UTC")
lines = [
"# Timmy Hot Memory",
"",
"> Working RAM — always loaded, ~300 lines max, pruned monthly",
f"> Last updated: {updated_date}",
"",
]
if facts:
lines.append("## Known Facts")
lines.append("")
for f in facts[:15]:
lines.append(f"- {f}")
if reflection:
lines.append("")
lines.append("## Last Reflection")
lines.append("")
lines.append(reflection)
return "\n".join(lines)
except Exception:
logger.debug("DB context read failed, falling back to file")
# Fallback to file if DB unavailable
# Fallback to file if DB unavailable or empty
if self.path.exists():
return self.path.read_text()

View File

@@ -393,3 +393,12 @@ def recall_last_reflection() -> str | None:
"ORDER BY created_at DESC LIMIT 1"
).fetchone()
return row["content"] if row else None
def recall_last_activity_time() -> str | None:
"""Return the ISO timestamp of the most recently stored memory, or None."""
with get_connection() as conn:
row = conn.execute(
"SELECT created_at FROM memories ORDER BY created_at DESC LIMIT 1"
).fetchone()
return row["created_at"] if row else None

View File

@@ -27,6 +27,7 @@ from timmy.memory.crud import ( # noqa: F401
get_memory_context,
get_memory_stats,
prune_memories,
recall_last_activity_time,
recall_last_reflection,
recall_personal_facts,
recall_personal_facts_with_ids,

View File

@@ -349,7 +349,7 @@ async def triage_research_report(
logger.info("No action items extracted from research report")
return []
results: list[dict] = []
results = []
for item in items:
if dry_run:
results.append({"action_item": item, "gitea_issue": None})

View File

@@ -249,8 +249,8 @@ class _ErrorRunOutput:
def __init__(self, message: str):
self.content = message
self.status = "ERROR"
self.requirements: list = []
self.tools: list = []
self.requirements = []
self.tools = []
@property
def active_requirements(self):

View File

@@ -46,7 +46,7 @@ class SessionLogger:
content: The message content
confidence: Optional confidence score (0.0 to 1.0)
"""
entry: dict = {
entry = {
"type": "message",
"role": role,
"content": content,

View File

@@ -36,7 +36,7 @@ except Exception: # ImportError or circular import during early startup
try:
from infrastructure.sovereignty_metrics import GRADUATION_TARGETS, get_sovereignty_store
except Exception:
GRADUATION_TARGETS: dict = {} # type: ignore[assignment,no-redef]
GRADUATION_TARGETS: dict = {} # type: ignore[assignment]
get_sovereignty_store = None # type: ignore[assignment]
logger = logging.getLogger(__name__)

View File

@@ -0,0 +1,598 @@
"""Unit tests for models/budget.py — comprehensive coverage for budget management.
Tests budget allocation, tracking, limit enforcement, and edge cases including:
- Zero budget scenarios
- Over-budget handling
- Budget reset behavior
- In-memory fallback when DB is unavailable
"""
import threading
import time
from datetime import UTC, date, datetime, timedelta
from unittest.mock import patch
import pytest
from infrastructure.models.budget import (
BudgetTracker,
SpendRecord,
estimate_cost_usd,
get_budget_tracker,
)
pytestmark = pytest.mark.unit
# ── Test SpendRecord dataclass ────────────────────────────────────────────────
class TestSpendRecord:
"""Tests for the SpendRecord dataclass."""
def test_spend_record_creation(self):
"""Test creating a SpendRecord with all fields."""
ts = time.time()
record = SpendRecord(
ts=ts,
provider="anthropic",
model="claude-haiku-4-5",
tokens_in=100,
tokens_out=200,
cost_usd=0.001,
tier="cloud",
)
assert record.ts == ts
assert record.provider == "anthropic"
assert record.model == "claude-haiku-4-5"
assert record.tokens_in == 100
assert record.tokens_out == 200
assert record.cost_usd == 0.001
assert record.tier == "cloud"
def test_spend_record_with_zero_tokens(self):
"""Test SpendRecord with zero tokens."""
ts = time.time()
record = SpendRecord(ts=ts, provider="openai", model="gpt-4o", tokens_in=0, tokens_out=0, cost_usd=0.0, tier="cloud")
assert record.tokens_in == 0
assert record.tokens_out == 0
# ── Test estimate_cost_usd function ───────────────────────────────────────────
class TestEstimateCostUsd:
"""Tests for the estimate_cost_usd function."""
def test_haiku_cheaper_than_sonnet(self):
"""Haiku should be cheaper than Sonnet for same tokens."""
haiku_cost = estimate_cost_usd("claude-haiku-4-5", 1000, 1000)
sonnet_cost = estimate_cost_usd("claude-sonnet-4-5", 1000, 1000)
assert haiku_cost < sonnet_cost
def test_zero_tokens_is_zero_cost(self):
"""Zero tokens should result in zero cost."""
assert estimate_cost_usd("gpt-4o", 0, 0) == 0.0
def test_only_input_tokens(self):
"""Cost calculation with only input tokens."""
cost = estimate_cost_usd("gpt-4o", 1000, 0)
expected = (1000 * 0.0025) / 1000.0 # $0.0025 per 1K input tokens
assert cost == pytest.approx(expected)
def test_only_output_tokens(self):
"""Cost calculation with only output tokens."""
cost = estimate_cost_usd("gpt-4o", 0, 1000)
expected = (1000 * 0.01) / 1000.0 # $0.01 per 1K output tokens
assert cost == pytest.approx(expected)
def test_unknown_model_uses_default(self):
"""Unknown model should use conservative default cost."""
cost = estimate_cost_usd("some-unknown-model-xyz", 1000, 1000)
assert cost > 0 # Uses conservative default, not zero
# Default is 0.003 input, 0.015 output per 1K
expected = (1000 * 0.003 + 1000 * 0.015) / 1000.0
assert cost == pytest.approx(expected)
def test_versioned_model_name_matches(self):
"""Versioned model names should match base model rates."""
cost1 = estimate_cost_usd("claude-haiku-4-5-20251001", 1000, 0)
cost2 = estimate_cost_usd("claude-haiku-4-5", 1000, 0)
assert cost1 == cost2
def test_gpt4o_mini_cheaper_than_gpt4o(self):
"""GPT-4o mini should be cheaper than GPT-4o."""
mini = estimate_cost_usd("gpt-4o-mini", 1000, 1000)
full = estimate_cost_usd("gpt-4o", 1000, 1000)
assert mini < full
def test_opus_most_expensive_claude(self):
"""Opus should be the most expensive Claude model."""
opus = estimate_cost_usd("claude-opus-4-5", 1000, 1000)
sonnet = estimate_cost_usd("claude-sonnet-4-5", 1000, 1000)
haiku = estimate_cost_usd("claude-haiku-4-5", 1000, 1000)
assert opus > sonnet > haiku
def test_grok_variants(self):
"""Test Grok model cost estimation."""
cost = estimate_cost_usd("grok-3", 1000, 1000)
assert cost > 0
cost_fast = estimate_cost_usd("grok-3-fast", 1000, 1000)
assert cost_fast > 0
def test_case_insensitive_matching(self):
"""Model name matching should be case insensitive."""
cost_lower = estimate_cost_usd("claude-haiku-4-5", 1000, 0)
cost_upper = estimate_cost_usd("CLAUDE-HAIKU-4-5", 1000, 0)
cost_mixed = estimate_cost_usd("Claude-Haiku-4-5", 1000, 0)
assert cost_lower == cost_upper == cost_mixed
def test_returns_float(self):
"""Function should always return a float."""
assert isinstance(estimate_cost_usd("haiku", 100, 200), float)
assert isinstance(estimate_cost_usd("unknown-model", 100, 200), float)
assert isinstance(estimate_cost_usd("haiku", 0, 0), float)
# ── Test BudgetTracker initialization ─────────────────────────────────────────
class TestBudgetTrackerInit:
"""Tests for BudgetTracker initialization."""
def test_creates_with_memory_db(self):
"""Tracker should initialize with in-memory database."""
tracker = BudgetTracker(db_path=":memory:")
assert tracker._db_ok is True
def test_in_memory_fallback_empty_on_creation(self):
"""In-memory fallback should start empty."""
tracker = BudgetTracker(db_path=":memory:")
assert tracker._in_memory == []
def test_custom_db_path(self, tmp_path):
"""Tracker should use custom database path."""
db_file = tmp_path / "custom_budget.db"
tracker = BudgetTracker(db_path=str(db_file))
assert tracker._db_ok is True
assert tracker._db_path == str(db_file)
assert db_file.exists()
def test_db_path_directory_creation(self, tmp_path):
"""Tracker should create parent directories if needed."""
db_file = tmp_path / "nested" / "dirs" / "budget.db"
tracker = BudgetTracker(db_path=str(db_file))
assert tracker._db_ok is True
assert db_file.parent.exists()
def test_invalid_db_path_fallback(self):
"""Tracker should fallback to in-memory on invalid path."""
# Use a path that cannot be created (e.g., permission denied simulation)
tracker = BudgetTracker.__new__(BudgetTracker)
tracker._db_path = "/nonexistent/invalid/path/budget.db"
tracker._lock = threading.Lock()
tracker._in_memory = []
tracker._db_ok = False
# Should still work with in-memory fallback
cost = tracker.record_spend("test", "model", cost_usd=0.01)
assert cost == 0.01
# ── Test BudgetTracker record_spend ───────────────────────────────────────────
class TestBudgetTrackerRecordSpend:
"""Tests for recording spend events."""
def test_record_spend_returns_cost(self):
"""record_spend should return the calculated cost."""
tracker = BudgetTracker(db_path=":memory:")
cost = tracker.record_spend("anthropic", "claude-haiku-4-5", 100, 200)
assert cost > 0
def test_record_spend_explicit_cost(self):
"""record_spend should use explicit cost when provided."""
tracker = BudgetTracker(db_path=":memory:")
cost = tracker.record_spend("anthropic", "model", cost_usd=1.23)
assert cost == pytest.approx(1.23)
def test_record_spend_accumulates(self):
"""Multiple spend records should accumulate correctly."""
tracker = BudgetTracker(db_path=":memory:")
tracker.record_spend("openai", "gpt-4o", cost_usd=0.01)
tracker.record_spend("openai", "gpt-4o", cost_usd=0.02)
assert tracker.get_daily_spend() == pytest.approx(0.03, abs=1e-9)
def test_record_spend_with_tier_label(self):
"""record_spend should accept custom tier labels."""
tracker = BudgetTracker(db_path=":memory:")
cost = tracker.record_spend("anthropic", "haiku", tier="cloud_api")
assert cost >= 0
def test_record_spend_with_provider(self):
"""record_spend should track provider correctly."""
tracker = BudgetTracker(db_path=":memory:")
tracker.record_spend("openai", "gpt-4o", cost_usd=0.01)
tracker.record_spend("anthropic", "claude-haiku", cost_usd=0.02)
assert tracker.get_daily_spend() == pytest.approx(0.03, abs=1e-9)
def test_record_zero_cost(self):
"""Recording zero cost should work correctly."""
tracker = BudgetTracker(db_path=":memory:")
cost = tracker.record_spend("test", "model", cost_usd=0.0)
assert cost == 0.0
assert tracker.get_daily_spend() == 0.0
def test_record_negative_cost(self):
"""Recording negative cost (refund) should work."""
tracker = BudgetTracker(db_path=":memory:")
cost = tracker.record_spend("test", "model", cost_usd=-0.50)
assert cost == -0.50
assert tracker.get_daily_spend() == -0.50
# ── Test BudgetTracker daily/monthly spend queries ────────────────────────────
class TestBudgetTrackerSpendQueries:
"""Tests for daily and monthly spend queries."""
def test_monthly_spend_includes_daily(self):
"""Monthly spend should be >= daily spend."""
tracker = BudgetTracker(db_path=":memory:")
tracker.record_spend("anthropic", "haiku", cost_usd=5.00)
assert tracker.get_monthly_spend() >= tracker.get_daily_spend()
def test_get_daily_spend_empty(self):
"""Daily spend should be zero when no records."""
tracker = BudgetTracker(db_path=":memory:")
assert tracker.get_daily_spend() == 0.0
def test_get_monthly_spend_empty(self):
"""Monthly spend should be zero when no records."""
tracker = BudgetTracker(db_path=":memory:")
assert tracker.get_monthly_spend() == 0.0
def test_daily_spend_isolation(self):
"""Daily spend should only include today's records, not old ones."""
tracker = BudgetTracker(db_path=":memory:")
# Force use of in-memory fallback
tracker._db_ok = False
# Add record for today
today_ts = datetime.combine(date.today(), datetime.min.time(), tzinfo=UTC).timestamp()
tracker._in_memory.append(
SpendRecord(today_ts + 3600, "test", "model", 0, 0, 1.0, "cloud")
)
# Add old record (2 days ago)
old_ts = (datetime.now(UTC) - timedelta(days=2)).timestamp()
tracker._in_memory.append(
SpendRecord(old_ts, "test", "old_model", 0, 0, 2.0, "cloud")
)
# Daily should only include today's 1.0
assert tracker.get_daily_spend() == pytest.approx(1.0, abs=1e-9)
# Monthly should include both (both are in current month)
assert tracker.get_monthly_spend() == pytest.approx(3.0, abs=1e-9)
# ── Test BudgetTracker cloud_allowed ──────────────────────────────────────────
class TestBudgetTrackerCloudAllowed:
"""Tests for cloud budget limit enforcement."""
def test_allowed_when_no_spend(self):
"""Cloud should be allowed when no spend recorded."""
tracker = BudgetTracker(db_path=":memory:")
assert tracker.cloud_allowed() is True
def test_blocked_when_daily_limit_exceeded(self):
"""Cloud should be blocked when daily limit exceeded."""
tracker = BudgetTracker(db_path=":memory:")
tracker.record_spend("anthropic", "haiku", cost_usd=999.0)
# With default daily limit of 5.0, 999 should block
assert tracker.cloud_allowed() is False
def test_allowed_when_daily_limit_zero(self):
"""Cloud should be allowed when daily limit is 0 (disabled)."""
tracker = BudgetTracker(db_path=":memory:")
tracker.record_spend("anthropic", "haiku", cost_usd=999.0)
with patch("infrastructure.models.budget.settings") as mock_settings:
mock_settings.tier_cloud_daily_budget_usd = 0 # disabled
mock_settings.tier_cloud_monthly_budget_usd = 0 # disabled
assert tracker.cloud_allowed() is True
def test_blocked_when_monthly_limit_exceeded(self):
"""Cloud should be blocked when monthly limit exceeded."""
tracker = BudgetTracker(db_path=":memory:")
tracker.record_spend("anthropic", "haiku", cost_usd=999.0)
with patch("infrastructure.models.budget.settings") as mock_settings:
mock_settings.tier_cloud_daily_budget_usd = 0 # daily disabled
mock_settings.tier_cloud_monthly_budget_usd = 10.0
assert tracker.cloud_allowed() is False
def test_allowed_at_exact_daily_limit(self):
"""Cloud should be allowed when exactly at daily limit."""
tracker = BudgetTracker(db_path=":memory:")
with patch("infrastructure.models.budget.settings") as mock_settings:
mock_settings.tier_cloud_daily_budget_usd = 5.0
mock_settings.tier_cloud_monthly_budget_usd = 0
# Record exactly at limit
tracker.record_spend("test", "model", cost_usd=5.0)
# At exactly the limit, it should return False (blocked)
# because spend >= limit
assert tracker.cloud_allowed() is False
def test_allowed_below_daily_limit(self):
"""Cloud should be allowed when below daily limit."""
tracker = BudgetTracker(db_path=":memory:")
with patch("infrastructure.models.budget.settings") as mock_settings:
mock_settings.tier_cloud_daily_budget_usd = 5.0
mock_settings.tier_cloud_monthly_budget_usd = 0
tracker.record_spend("test", "model", cost_usd=4.99)
assert tracker.cloud_allowed() is True
def test_zero_budget_blocks_all(self):
"""Zero budget should block all cloud usage."""
tracker = BudgetTracker(db_path=":memory:")
with patch("infrastructure.models.budget.settings") as mock_settings:
mock_settings.tier_cloud_daily_budget_usd = 0.01 # Very small budget
mock_settings.tier_cloud_monthly_budget_usd = 0
tracker.record_spend("test", "model", cost_usd=0.02)
# Over the tiny budget, should be blocked
assert tracker.cloud_allowed() is False
def test_both_limits_checked(self):
"""Both daily and monthly limits should be checked."""
tracker = BudgetTracker(db_path=":memory:")
with patch("infrastructure.models.budget.settings") as mock_settings:
mock_settings.tier_cloud_daily_budget_usd = 100.0
mock_settings.tier_cloud_monthly_budget_usd = 10.0
tracker.record_spend("test", "model", cost_usd=15.0)
# Under daily but over monthly
assert tracker.cloud_allowed() is False
# ── Test BudgetTracker summary ────────────────────────────────────────────────
class TestBudgetTrackerSummary:
"""Tests for budget summary functionality."""
def test_summary_keys_present(self):
"""Summary should contain all expected keys."""
tracker = BudgetTracker(db_path=":memory:")
summary = tracker.get_summary()
assert "daily_usd" in summary
assert "monthly_usd" in summary
assert "daily_limit_usd" in summary
assert "monthly_limit_usd" in summary
assert "daily_ok" in summary
assert "monthly_ok" in summary
def test_summary_daily_ok_true_on_empty(self):
"""daily_ok and monthly_ok should be True when empty."""
tracker = BudgetTracker(db_path=":memory:")
summary = tracker.get_summary()
assert summary["daily_ok"] is True
assert summary["monthly_ok"] is True
def test_summary_daily_ok_false_when_exceeded(self):
"""daily_ok should be False when daily limit exceeded."""
tracker = BudgetTracker(db_path=":memory:")
tracker.record_spend("openai", "gpt-4o", cost_usd=999.0)
summary = tracker.get_summary()
assert summary["daily_ok"] is False
def test_summary_monthly_ok_false_when_exceeded(self):
"""monthly_ok should be False when monthly limit exceeded."""
tracker = BudgetTracker(db_path=":memory:")
with patch("infrastructure.models.budget.settings") as mock_settings:
mock_settings.tier_cloud_daily_budget_usd = 0
mock_settings.tier_cloud_monthly_budget_usd = 10.0
tracker.record_spend("openai", "gpt-4o", cost_usd=15.0)
summary = tracker.get_summary()
assert summary["monthly_ok"] is False
def test_summary_values_rounded(self):
"""Summary values should be rounded appropriately."""
tracker = BudgetTracker(db_path=":memory:")
tracker.record_spend("test", "model", cost_usd=1.123456789)
summary = tracker.get_summary()
# daily_usd should be rounded to 6 decimal places
assert summary["daily_usd"] == 1.123457
def test_summary_with_disabled_limits(self):
"""Summary should handle disabled limits (0)."""
tracker = BudgetTracker(db_path=":memory:")
with patch("infrastructure.models.budget.settings") as mock_settings:
mock_settings.tier_cloud_daily_budget_usd = 0
mock_settings.tier_cloud_monthly_budget_usd = 0
tracker.record_spend("test", "model", cost_usd=100.0)
summary = tracker.get_summary()
assert summary["daily_limit_usd"] == 0
assert summary["monthly_limit_usd"] == 0
assert summary["daily_ok"] is True
assert summary["monthly_ok"] is True
# ── Test BudgetTracker in-memory fallback ─────────────────────────────────────
class TestBudgetTrackerInMemoryFallback:
"""Tests for in-memory fallback when DB is unavailable."""
def test_in_memory_records_persisted(self):
"""Records should be stored in memory when DB is unavailable."""
tracker = BudgetTracker(db_path=":memory:")
# Force DB to appear unavailable
tracker._db_ok = False
tracker.record_spend("test", "model", cost_usd=0.01)
assert len(tracker._in_memory) == 1
assert tracker._in_memory[0].cost_usd == 0.01
def test_in_memory_query_spend(self):
"""Query spend should work with in-memory fallback."""
tracker = BudgetTracker(db_path=":memory:")
tracker._db_ok = False
tracker.record_spend("test", "model", cost_usd=0.01)
# Query should work from in-memory
since_ts = (datetime.now(UTC) - timedelta(hours=1)).timestamp()
result = tracker._query_spend(since_ts)
assert result == 0.01
def test_in_memory_older_records_not_counted(self):
"""In-memory records older than since_ts should not be counted."""
tracker = BudgetTracker(db_path=":memory:")
tracker._db_ok = False
old_ts = (datetime.now(UTC) - timedelta(days=2)).timestamp()
tracker._in_memory.append(
SpendRecord(old_ts, "test", "model", 0, 0, 1.0, "cloud")
)
# Query for records in last day
since_ts = (datetime.now(UTC) - timedelta(days=1)).timestamp()
result = tracker._query_spend(since_ts)
assert result == 0.0
# ── Test BudgetTracker thread safety ──────────────────────────────────────────
class TestBudgetTrackerThreadSafety:
"""Tests for thread-safe operations."""
def test_concurrent_record_spend(self):
"""Multiple threads should safely record spend concurrently."""
tracker = BudgetTracker(db_path=":memory:")
results = []
errors = []
def record_spends():
try:
for _ in range(10):
cost = tracker.record_spend("test", "model", cost_usd=0.01)
results.append(cost)
except Exception as e:
errors.append(e)
threads = [threading.Thread(target=record_spends) for _ in range(5)]
for t in threads:
t.start()
for t in threads:
t.join()
assert len(errors) == 0
assert len(results) == 50
assert tracker.get_daily_spend() == pytest.approx(0.50, abs=1e-9)
# ── Test BudgetTracker edge cases ─────────────────────────────────────────────
class TestBudgetTrackerEdgeCases:
"""Tests for edge cases and boundary conditions."""
def test_very_small_cost(self):
"""Tracker should handle very small costs."""
tracker = BudgetTracker(db_path=":memory:")
tracker.record_spend("test", "model", cost_usd=0.000001)
assert tracker.get_daily_spend() == pytest.approx(0.000001, abs=1e-9)
def test_very_large_cost(self):
"""Tracker should handle very large costs."""
tracker = BudgetTracker(db_path=":memory:")
tracker.record_spend("test", "model", cost_usd=1_000_000.0)
assert tracker.get_daily_spend() == pytest.approx(1_000_000.0, abs=1e-9)
def test_many_records(self):
"""Tracker should handle many records efficiently."""
tracker = BudgetTracker(db_path=":memory:")
for i in range(100):
tracker.record_spend(f"provider_{i}", f"model_{i}", cost_usd=0.01)
assert tracker.get_daily_spend() == pytest.approx(1.0, abs=1e-9)
def test_empty_provider_name(self):
"""Tracker should handle empty provider name."""
tracker = BudgetTracker(db_path=":memory:")
cost = tracker.record_spend("", "model", cost_usd=0.01)
assert cost == 0.01
def test_empty_model_name(self):
"""Tracker should handle empty model name."""
tracker = BudgetTracker(db_path=":memory:")
cost = tracker.record_spend("provider", "", cost_usd=0.01)
assert cost == 0.01
# ── Test get_budget_tracker singleton ─────────────────────────────────────────
class TestGetBudgetTrackerSingleton:
"""Tests for the module-level BudgetTracker singleton."""
def test_returns_budget_tracker(self):
"""Singleton should return a BudgetTracker instance."""
import infrastructure.models.budget as bmod
bmod._budget_tracker = None
tracker = get_budget_tracker()
assert isinstance(tracker, BudgetTracker)
def test_returns_same_instance(self):
"""Singleton should return the same instance."""
import infrastructure.models.budget as bmod
bmod._budget_tracker = None
t1 = get_budget_tracker()
t2 = get_budget_tracker()
assert t1 is t2
def test_singleton_persists_state(self):
"""Singleton should persist state across calls."""
import infrastructure.models.budget as bmod
bmod._budget_tracker = None
tracker1 = get_budget_tracker()
# Record spend
tracker1.record_spend("test", "model", cost_usd=1.0)
# Get singleton again
tracker2 = get_budget_tracker()
assert tracker1 is tracker2
# ── Test BudgetTracker with mocked settings ───────────────────────────────────
class TestBudgetTrackerWithMockedSettings:
"""Tests using mocked settings for different scenarios."""
def test_high_daily_limit(self):
"""Test with high daily limit."""
tracker = BudgetTracker(db_path=":memory:")
with patch("infrastructure.models.budget.settings") as mock_settings:
mock_settings.tier_cloud_daily_budget_usd = 1000.0
mock_settings.tier_cloud_monthly_budget_usd = 10000.0
tracker.record_spend("test", "model", cost_usd=500.0)
assert tracker.cloud_allowed() is True
def test_low_daily_limit(self):
"""Test with low daily limit."""
tracker = BudgetTracker(db_path=":memory:")
with patch("infrastructure.models.budget.settings") as mock_settings:
mock_settings.tier_cloud_daily_budget_usd = 1.0
mock_settings.tier_cloud_monthly_budget_usd = 100.0
tracker.record_spend("test", "model", cost_usd=2.0)
assert tracker.cloud_allowed() is False
def test_only_monthly_limit_enabled(self):
"""Test with only monthly limit enabled."""
tracker = BudgetTracker(db_path=":memory:")
with patch("infrastructure.models.budget.settings") as mock_settings:
mock_settings.tier_cloud_daily_budget_usd = 0 # Disabled
mock_settings.tier_cloud_monthly_budget_usd = 50.0
tracker.record_spend("test", "model", cost_usd=30.0)
assert tracker.cloud_allowed() is True
tracker.record_spend("test", "model", cost_usd=25.0)
assert tracker.cloud_allowed() is False

View File

@@ -287,6 +287,148 @@ class TestJotNote:
assert "body is empty" in jot_note("title", " ")
class TestHotMemoryTimestamp:
"""Tests for Working RAM auto-updating timestamp (issue #10)."""
def test_read_includes_last_updated_when_facts_exist(self, tmp_path):
"""HotMemory.read() includes a 'Last updated' timestamp when DB has facts."""
db_path = tmp_path / "memory.db"
with (
patch("timmy.memory.db.DB_PATH", db_path),
patch("timmy.memory.crud.get_connection") as mock_conn,
):
import sqlite3
from contextlib import contextmanager
real_conn = sqlite3.connect(str(db_path))
real_conn.row_factory = sqlite3.Row
real_conn.execute("""
CREATE TABLE IF NOT EXISTS memories (
id TEXT PRIMARY KEY,
content TEXT NOT NULL,
memory_type TEXT NOT NULL DEFAULT 'fact',
source TEXT NOT NULL DEFAULT 'agent',
embedding TEXT, metadata TEXT, source_hash TEXT,
agent_id TEXT, task_id TEXT, session_id TEXT,
confidence REAL NOT NULL DEFAULT 0.8,
tags TEXT NOT NULL DEFAULT '[]',
created_at TEXT NOT NULL,
last_accessed TEXT,
access_count INTEGER NOT NULL DEFAULT 0
)
""")
real_conn.execute(
"INSERT INTO memories (id, content, memory_type, source, created_at) "
"VALUES ('1', 'User prefers dark mode', 'fact', 'system', '2026-03-20T10:00:00+00:00')"
)
real_conn.commit()
@contextmanager
def fake_get_connection():
yield real_conn
mock_conn.side_effect = fake_get_connection
hot = HotMemory()
result = hot.read()
assert "> Last updated:" in result
assert "2026-03-20" in result
assert "User prefers dark mode" in result
def test_read_timestamp_reflects_most_recent_memory(self, tmp_path):
"""The timestamp in HotMemory.read() matches the latest memory's created_at."""
db_path = tmp_path / "memory.db"
with patch("timmy.memory.crud.get_connection") as mock_conn:
import sqlite3
from contextlib import contextmanager
real_conn = sqlite3.connect(str(db_path))
real_conn.row_factory = sqlite3.Row
real_conn.execute("""
CREATE TABLE IF NOT EXISTS memories (
id TEXT PRIMARY KEY,
content TEXT NOT NULL,
memory_type TEXT NOT NULL DEFAULT 'fact',
source TEXT NOT NULL DEFAULT 'agent',
embedding TEXT, metadata TEXT, source_hash TEXT,
agent_id TEXT, task_id TEXT, session_id TEXT,
confidence REAL NOT NULL DEFAULT 0.8,
tags TEXT NOT NULL DEFAULT '[]',
created_at TEXT NOT NULL,
last_accessed TEXT,
access_count INTEGER NOT NULL DEFAULT 0
)
""")
# Older fact
real_conn.execute(
"INSERT INTO memories (id, content, memory_type, source, created_at) "
"VALUES ('1', 'old fact', 'fact', 'system', '2026-03-15T08:00:00+00:00')"
)
# Newer fact — this should be reflected in the timestamp
real_conn.execute(
"INSERT INTO memories (id, content, memory_type, source, created_at) "
"VALUES ('2', 'new fact', 'fact', 'system', '2026-03-23T14:30:00+00:00')"
)
real_conn.commit()
@contextmanager
def fake_get_connection():
yield real_conn
mock_conn.side_effect = fake_get_connection
hot = HotMemory()
result = hot.read()
assert "2026-03-23" in result
assert "> Last updated:" in result
def test_read_falls_back_to_file_when_db_empty(self, tmp_path):
"""HotMemory.read() falls back to MEMORY.md when DB has no facts or reflections."""
mem_file = tmp_path / "MEMORY.md"
mem_file.write_text("# Timmy Hot Memory\n\n## Current Status\n\nOperational\n")
with patch("timmy.memory.crud.get_connection") as mock_conn:
import sqlite3
from contextlib import contextmanager
db_path = tmp_path / "empty.db"
real_conn = sqlite3.connect(str(db_path))
real_conn.row_factory = sqlite3.Row
real_conn.execute("""
CREATE TABLE IF NOT EXISTS memories (
id TEXT PRIMARY KEY,
content TEXT NOT NULL,
memory_type TEXT NOT NULL DEFAULT 'fact',
source TEXT NOT NULL DEFAULT 'agent',
embedding TEXT, metadata TEXT, source_hash TEXT,
agent_id TEXT, task_id TEXT, session_id TEXT,
confidence REAL NOT NULL DEFAULT 0.8,
tags TEXT NOT NULL DEFAULT '[]',
created_at TEXT NOT NULL,
last_accessed TEXT,
access_count INTEGER NOT NULL DEFAULT 0
)
""")
real_conn.commit()
@contextmanager
def fake_get_connection():
yield real_conn
mock_conn.side_effect = fake_get_connection
hot = HotMemory()
hot.path = mem_file
result = hot.read()
assert "Operational" in result
assert "> Last updated:" not in result
class TestLogDecision:
"""Tests for log_decision() artifact tool."""

View File

@@ -41,8 +41,6 @@ description = Static type checking with mypy
commands_pre =
deps =
mypy>=1.0.0
types-PyYAML
types-requests
commands =
mypy src --ignore-missing-imports --no-error-summary