1
0

Compare commits

...

3 Commits

Author SHA1 Message Date
Alexander Whitestone
13212b92b5 fix: auto-update Working RAM timestamp from DB activity
HotMemory.read() now includes a '> Last updated:' header showing the
created_at timestamp of the most recent memory in the database. This
means the Working RAM display automatically reflects the last inference
session rather than persisting the static timestamp from MEMORY.md.

- Add recall_last_activity_time() to timmy.memory.crud
- Update HotMemory.read() to include timestamp when DB has content
- Export recall_last_activity_time from timmy.memory_system
- Add TestHotMemoryTimestamp test class (3 tests)

Fixes #1351

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-23 22:57:36 -04:00
9e8e0f8552 [claude] Placeholder research artifact for issue #1341 (#1350) 2026-03-24 02:49:37 +00:00
e09082a8a8 [kimi] Add comprehensive unit tests for models/budget.py (#1316) (#1347) 2026-03-24 02:48:51 +00:00
6 changed files with 815 additions and 14 deletions

35
memory/research/task.md Normal file
View File

@@ -0,0 +1,35 @@
# Research Report: Task #1341
**Date:** 2026-03-23
**Issue:** [#1341](http://143.198.27.163:3000/Rockachopa/Timmy-time-dashboard/issues/1341)
**Priority:** normal
**Delegated by:** Timmy via Kimi delegation pipeline
---
## Summary
This issue was submitted as a placeholder via the Kimi delegation pipeline with unfilled template fields:
- **Research Question:** `Q?` (template default — no actual question provided)
- **Background / Context:** `ctx` (template default — no context provided)
- **Task:** `Task` (template default — no task specified)
## Findings
No actionable research question was specified. The issue appears to be a test or
accidental submission of an unfilled delegation template.
## Recommendations
1. **Re-open with a real question** if there is a specific topic to research.
2. **Review the delegation pipeline** to add validation that prevents empty/template-default
submissions from reaching the backlog (e.g. reject issues where the body contains
literal placeholder strings like `Q?` or `ctx`).
3. **Add a pipeline guard** in the Kimi delegation script to require non-empty, non-default
values for `Research Question` and `Background / Context` before creating an issue.
## Next Steps
- [ ] Add input validation to Kimi delegation pipeline
- [ ] Re-file with a concrete research question if needed

View File

@@ -9,7 +9,7 @@ import re
from datetime import UTC, datetime
from pathlib import Path
from timmy.memory.crud import recall_last_reflection, recall_personal_facts
from timmy.memory.crud import recall_last_activity_time, recall_last_reflection, recall_personal_facts
from timmy.memory.db import HOT_MEMORY_PATH, VAULT_PATH
logger = logging.getLogger(__name__)
@@ -89,25 +89,41 @@ class HotMemory:
"""Read hot memory — computed view of top facts + last reflection from DB."""
try:
facts = recall_personal_facts()
lines = ["# Timmy Hot Memory\n"]
if facts:
lines.append("## Known Facts\n")
for f in facts[:15]:
lines.append(f"- {f}")
# Include the last reflection if available
reflection = recall_last_reflection()
if reflection:
lines.append("\n## Last Reflection\n")
lines.append(reflection)
if len(lines) > 1:
if facts or reflection:
last_ts = recall_last_activity_time()
try:
updated_date = datetime.fromisoformat(last_ts).strftime("%Y-%m-%d %H:%M UTC")
except (TypeError, ValueError):
updated_date = datetime.now(UTC).strftime("%Y-%m-%d %H:%M UTC")
lines = [
"# Timmy Hot Memory",
"",
"> Working RAM — always loaded, ~300 lines max, pruned monthly",
f"> Last updated: {updated_date}",
"",
]
if facts:
lines.append("## Known Facts")
lines.append("")
for f in facts[:15]:
lines.append(f"- {f}")
if reflection:
lines.append("")
lines.append("## Last Reflection")
lines.append("")
lines.append(reflection)
return "\n".join(lines)
except Exception:
logger.debug("DB context read failed, falling back to file")
# Fallback to file if DB unavailable
# Fallback to file if DB unavailable or empty
if self.path.exists():
return self.path.read_text()

View File

@@ -393,3 +393,12 @@ def recall_last_reflection() -> str | None:
"ORDER BY created_at DESC LIMIT 1"
).fetchone()
return row["content"] if row else None
def recall_last_activity_time() -> str | None:
"""Return the ISO timestamp of the most recently stored memory, or None."""
with get_connection() as conn:
row = conn.execute(
"SELECT created_at FROM memories ORDER BY created_at DESC LIMIT 1"
).fetchone()
return row["created_at"] if row else None

View File

@@ -27,6 +27,7 @@ from timmy.memory.crud import ( # noqa: F401
get_memory_context,
get_memory_stats,
prune_memories,
recall_last_activity_time,
recall_last_reflection,
recall_personal_facts,
recall_personal_facts_with_ids,

View File

@@ -0,0 +1,598 @@
"""Unit tests for models/budget.py — comprehensive coverage for budget management.
Tests budget allocation, tracking, limit enforcement, and edge cases including:
- Zero budget scenarios
- Over-budget handling
- Budget reset behavior
- In-memory fallback when DB is unavailable
"""
import threading
import time
from datetime import UTC, date, datetime, timedelta
from unittest.mock import patch
import pytest
from infrastructure.models.budget import (
BudgetTracker,
SpendRecord,
estimate_cost_usd,
get_budget_tracker,
)
pytestmark = pytest.mark.unit
# ── Test SpendRecord dataclass ────────────────────────────────────────────────
class TestSpendRecord:
"""Tests for the SpendRecord dataclass."""
def test_spend_record_creation(self):
"""Test creating a SpendRecord with all fields."""
ts = time.time()
record = SpendRecord(
ts=ts,
provider="anthropic",
model="claude-haiku-4-5",
tokens_in=100,
tokens_out=200,
cost_usd=0.001,
tier="cloud",
)
assert record.ts == ts
assert record.provider == "anthropic"
assert record.model == "claude-haiku-4-5"
assert record.tokens_in == 100
assert record.tokens_out == 200
assert record.cost_usd == 0.001
assert record.tier == "cloud"
def test_spend_record_with_zero_tokens(self):
"""Test SpendRecord with zero tokens."""
ts = time.time()
record = SpendRecord(ts=ts, provider="openai", model="gpt-4o", tokens_in=0, tokens_out=0, cost_usd=0.0, tier="cloud")
assert record.tokens_in == 0
assert record.tokens_out == 0
# ── Test estimate_cost_usd function ───────────────────────────────────────────
class TestEstimateCostUsd:
"""Tests for the estimate_cost_usd function."""
def test_haiku_cheaper_than_sonnet(self):
"""Haiku should be cheaper than Sonnet for same tokens."""
haiku_cost = estimate_cost_usd("claude-haiku-4-5", 1000, 1000)
sonnet_cost = estimate_cost_usd("claude-sonnet-4-5", 1000, 1000)
assert haiku_cost < sonnet_cost
def test_zero_tokens_is_zero_cost(self):
"""Zero tokens should result in zero cost."""
assert estimate_cost_usd("gpt-4o", 0, 0) == 0.0
def test_only_input_tokens(self):
"""Cost calculation with only input tokens."""
cost = estimate_cost_usd("gpt-4o", 1000, 0)
expected = (1000 * 0.0025) / 1000.0 # $0.0025 per 1K input tokens
assert cost == pytest.approx(expected)
def test_only_output_tokens(self):
"""Cost calculation with only output tokens."""
cost = estimate_cost_usd("gpt-4o", 0, 1000)
expected = (1000 * 0.01) / 1000.0 # $0.01 per 1K output tokens
assert cost == pytest.approx(expected)
def test_unknown_model_uses_default(self):
"""Unknown model should use conservative default cost."""
cost = estimate_cost_usd("some-unknown-model-xyz", 1000, 1000)
assert cost > 0 # Uses conservative default, not zero
# Default is 0.003 input, 0.015 output per 1K
expected = (1000 * 0.003 + 1000 * 0.015) / 1000.0
assert cost == pytest.approx(expected)
def test_versioned_model_name_matches(self):
"""Versioned model names should match base model rates."""
cost1 = estimate_cost_usd("claude-haiku-4-5-20251001", 1000, 0)
cost2 = estimate_cost_usd("claude-haiku-4-5", 1000, 0)
assert cost1 == cost2
def test_gpt4o_mini_cheaper_than_gpt4o(self):
"""GPT-4o mini should be cheaper than GPT-4o."""
mini = estimate_cost_usd("gpt-4o-mini", 1000, 1000)
full = estimate_cost_usd("gpt-4o", 1000, 1000)
assert mini < full
def test_opus_most_expensive_claude(self):
"""Opus should be the most expensive Claude model."""
opus = estimate_cost_usd("claude-opus-4-5", 1000, 1000)
sonnet = estimate_cost_usd("claude-sonnet-4-5", 1000, 1000)
haiku = estimate_cost_usd("claude-haiku-4-5", 1000, 1000)
assert opus > sonnet > haiku
def test_grok_variants(self):
"""Test Grok model cost estimation."""
cost = estimate_cost_usd("grok-3", 1000, 1000)
assert cost > 0
cost_fast = estimate_cost_usd("grok-3-fast", 1000, 1000)
assert cost_fast > 0
def test_case_insensitive_matching(self):
"""Model name matching should be case insensitive."""
cost_lower = estimate_cost_usd("claude-haiku-4-5", 1000, 0)
cost_upper = estimate_cost_usd("CLAUDE-HAIKU-4-5", 1000, 0)
cost_mixed = estimate_cost_usd("Claude-Haiku-4-5", 1000, 0)
assert cost_lower == cost_upper == cost_mixed
def test_returns_float(self):
"""Function should always return a float."""
assert isinstance(estimate_cost_usd("haiku", 100, 200), float)
assert isinstance(estimate_cost_usd("unknown-model", 100, 200), float)
assert isinstance(estimate_cost_usd("haiku", 0, 0), float)
# ── Test BudgetTracker initialization ─────────────────────────────────────────
class TestBudgetTrackerInit:
"""Tests for BudgetTracker initialization."""
def test_creates_with_memory_db(self):
"""Tracker should initialize with in-memory database."""
tracker = BudgetTracker(db_path=":memory:")
assert tracker._db_ok is True
def test_in_memory_fallback_empty_on_creation(self):
"""In-memory fallback should start empty."""
tracker = BudgetTracker(db_path=":memory:")
assert tracker._in_memory == []
def test_custom_db_path(self, tmp_path):
"""Tracker should use custom database path."""
db_file = tmp_path / "custom_budget.db"
tracker = BudgetTracker(db_path=str(db_file))
assert tracker._db_ok is True
assert tracker._db_path == str(db_file)
assert db_file.exists()
def test_db_path_directory_creation(self, tmp_path):
"""Tracker should create parent directories if needed."""
db_file = tmp_path / "nested" / "dirs" / "budget.db"
tracker = BudgetTracker(db_path=str(db_file))
assert tracker._db_ok is True
assert db_file.parent.exists()
def test_invalid_db_path_fallback(self):
"""Tracker should fallback to in-memory on invalid path."""
# Use a path that cannot be created (e.g., permission denied simulation)
tracker = BudgetTracker.__new__(BudgetTracker)
tracker._db_path = "/nonexistent/invalid/path/budget.db"
tracker._lock = threading.Lock()
tracker._in_memory = []
tracker._db_ok = False
# Should still work with in-memory fallback
cost = tracker.record_spend("test", "model", cost_usd=0.01)
assert cost == 0.01
# ── Test BudgetTracker record_spend ───────────────────────────────────────────
class TestBudgetTrackerRecordSpend:
"""Tests for recording spend events."""
def test_record_spend_returns_cost(self):
"""record_spend should return the calculated cost."""
tracker = BudgetTracker(db_path=":memory:")
cost = tracker.record_spend("anthropic", "claude-haiku-4-5", 100, 200)
assert cost > 0
def test_record_spend_explicit_cost(self):
"""record_spend should use explicit cost when provided."""
tracker = BudgetTracker(db_path=":memory:")
cost = tracker.record_spend("anthropic", "model", cost_usd=1.23)
assert cost == pytest.approx(1.23)
def test_record_spend_accumulates(self):
"""Multiple spend records should accumulate correctly."""
tracker = BudgetTracker(db_path=":memory:")
tracker.record_spend("openai", "gpt-4o", cost_usd=0.01)
tracker.record_spend("openai", "gpt-4o", cost_usd=0.02)
assert tracker.get_daily_spend() == pytest.approx(0.03, abs=1e-9)
def test_record_spend_with_tier_label(self):
"""record_spend should accept custom tier labels."""
tracker = BudgetTracker(db_path=":memory:")
cost = tracker.record_spend("anthropic", "haiku", tier="cloud_api")
assert cost >= 0
def test_record_spend_with_provider(self):
"""record_spend should track provider correctly."""
tracker = BudgetTracker(db_path=":memory:")
tracker.record_spend("openai", "gpt-4o", cost_usd=0.01)
tracker.record_spend("anthropic", "claude-haiku", cost_usd=0.02)
assert tracker.get_daily_spend() == pytest.approx(0.03, abs=1e-9)
def test_record_zero_cost(self):
"""Recording zero cost should work correctly."""
tracker = BudgetTracker(db_path=":memory:")
cost = tracker.record_spend("test", "model", cost_usd=0.0)
assert cost == 0.0
assert tracker.get_daily_spend() == 0.0
def test_record_negative_cost(self):
"""Recording negative cost (refund) should work."""
tracker = BudgetTracker(db_path=":memory:")
cost = tracker.record_spend("test", "model", cost_usd=-0.50)
assert cost == -0.50
assert tracker.get_daily_spend() == -0.50
# ── Test BudgetTracker daily/monthly spend queries ────────────────────────────
class TestBudgetTrackerSpendQueries:
"""Tests for daily and monthly spend queries."""
def test_monthly_spend_includes_daily(self):
"""Monthly spend should be >= daily spend."""
tracker = BudgetTracker(db_path=":memory:")
tracker.record_spend("anthropic", "haiku", cost_usd=5.00)
assert tracker.get_monthly_spend() >= tracker.get_daily_spend()
def test_get_daily_spend_empty(self):
"""Daily spend should be zero when no records."""
tracker = BudgetTracker(db_path=":memory:")
assert tracker.get_daily_spend() == 0.0
def test_get_monthly_spend_empty(self):
"""Monthly spend should be zero when no records."""
tracker = BudgetTracker(db_path=":memory:")
assert tracker.get_monthly_spend() == 0.0
def test_daily_spend_isolation(self):
"""Daily spend should only include today's records, not old ones."""
tracker = BudgetTracker(db_path=":memory:")
# Force use of in-memory fallback
tracker._db_ok = False
# Add record for today
today_ts = datetime.combine(date.today(), datetime.min.time(), tzinfo=UTC).timestamp()
tracker._in_memory.append(
SpendRecord(today_ts + 3600, "test", "model", 0, 0, 1.0, "cloud")
)
# Add old record (2 days ago)
old_ts = (datetime.now(UTC) - timedelta(days=2)).timestamp()
tracker._in_memory.append(
SpendRecord(old_ts, "test", "old_model", 0, 0, 2.0, "cloud")
)
# Daily should only include today's 1.0
assert tracker.get_daily_spend() == pytest.approx(1.0, abs=1e-9)
# Monthly should include both (both are in current month)
assert tracker.get_monthly_spend() == pytest.approx(3.0, abs=1e-9)
# ── Test BudgetTracker cloud_allowed ──────────────────────────────────────────
class TestBudgetTrackerCloudAllowed:
"""Tests for cloud budget limit enforcement."""
def test_allowed_when_no_spend(self):
"""Cloud should be allowed when no spend recorded."""
tracker = BudgetTracker(db_path=":memory:")
assert tracker.cloud_allowed() is True
def test_blocked_when_daily_limit_exceeded(self):
"""Cloud should be blocked when daily limit exceeded."""
tracker = BudgetTracker(db_path=":memory:")
tracker.record_spend("anthropic", "haiku", cost_usd=999.0)
# With default daily limit of 5.0, 999 should block
assert tracker.cloud_allowed() is False
def test_allowed_when_daily_limit_zero(self):
"""Cloud should be allowed when daily limit is 0 (disabled)."""
tracker = BudgetTracker(db_path=":memory:")
tracker.record_spend("anthropic", "haiku", cost_usd=999.0)
with patch("infrastructure.models.budget.settings") as mock_settings:
mock_settings.tier_cloud_daily_budget_usd = 0 # disabled
mock_settings.tier_cloud_monthly_budget_usd = 0 # disabled
assert tracker.cloud_allowed() is True
def test_blocked_when_monthly_limit_exceeded(self):
"""Cloud should be blocked when monthly limit exceeded."""
tracker = BudgetTracker(db_path=":memory:")
tracker.record_spend("anthropic", "haiku", cost_usd=999.0)
with patch("infrastructure.models.budget.settings") as mock_settings:
mock_settings.tier_cloud_daily_budget_usd = 0 # daily disabled
mock_settings.tier_cloud_monthly_budget_usd = 10.0
assert tracker.cloud_allowed() is False
def test_allowed_at_exact_daily_limit(self):
"""Cloud should be allowed when exactly at daily limit."""
tracker = BudgetTracker(db_path=":memory:")
with patch("infrastructure.models.budget.settings") as mock_settings:
mock_settings.tier_cloud_daily_budget_usd = 5.0
mock_settings.tier_cloud_monthly_budget_usd = 0
# Record exactly at limit
tracker.record_spend("test", "model", cost_usd=5.0)
# At exactly the limit, it should return False (blocked)
# because spend >= limit
assert tracker.cloud_allowed() is False
def test_allowed_below_daily_limit(self):
"""Cloud should be allowed when below daily limit."""
tracker = BudgetTracker(db_path=":memory:")
with patch("infrastructure.models.budget.settings") as mock_settings:
mock_settings.tier_cloud_daily_budget_usd = 5.0
mock_settings.tier_cloud_monthly_budget_usd = 0
tracker.record_spend("test", "model", cost_usd=4.99)
assert tracker.cloud_allowed() is True
def test_zero_budget_blocks_all(self):
"""Zero budget should block all cloud usage."""
tracker = BudgetTracker(db_path=":memory:")
with patch("infrastructure.models.budget.settings") as mock_settings:
mock_settings.tier_cloud_daily_budget_usd = 0.01 # Very small budget
mock_settings.tier_cloud_monthly_budget_usd = 0
tracker.record_spend("test", "model", cost_usd=0.02)
# Over the tiny budget, should be blocked
assert tracker.cloud_allowed() is False
def test_both_limits_checked(self):
"""Both daily and monthly limits should be checked."""
tracker = BudgetTracker(db_path=":memory:")
with patch("infrastructure.models.budget.settings") as mock_settings:
mock_settings.tier_cloud_daily_budget_usd = 100.0
mock_settings.tier_cloud_monthly_budget_usd = 10.0
tracker.record_spend("test", "model", cost_usd=15.0)
# Under daily but over monthly
assert tracker.cloud_allowed() is False
# ── Test BudgetTracker summary ────────────────────────────────────────────────
class TestBudgetTrackerSummary:
"""Tests for budget summary functionality."""
def test_summary_keys_present(self):
"""Summary should contain all expected keys."""
tracker = BudgetTracker(db_path=":memory:")
summary = tracker.get_summary()
assert "daily_usd" in summary
assert "monthly_usd" in summary
assert "daily_limit_usd" in summary
assert "monthly_limit_usd" in summary
assert "daily_ok" in summary
assert "monthly_ok" in summary
def test_summary_daily_ok_true_on_empty(self):
"""daily_ok and monthly_ok should be True when empty."""
tracker = BudgetTracker(db_path=":memory:")
summary = tracker.get_summary()
assert summary["daily_ok"] is True
assert summary["monthly_ok"] is True
def test_summary_daily_ok_false_when_exceeded(self):
"""daily_ok should be False when daily limit exceeded."""
tracker = BudgetTracker(db_path=":memory:")
tracker.record_spend("openai", "gpt-4o", cost_usd=999.0)
summary = tracker.get_summary()
assert summary["daily_ok"] is False
def test_summary_monthly_ok_false_when_exceeded(self):
"""monthly_ok should be False when monthly limit exceeded."""
tracker = BudgetTracker(db_path=":memory:")
with patch("infrastructure.models.budget.settings") as mock_settings:
mock_settings.tier_cloud_daily_budget_usd = 0
mock_settings.tier_cloud_monthly_budget_usd = 10.0
tracker.record_spend("openai", "gpt-4o", cost_usd=15.0)
summary = tracker.get_summary()
assert summary["monthly_ok"] is False
def test_summary_values_rounded(self):
"""Summary values should be rounded appropriately."""
tracker = BudgetTracker(db_path=":memory:")
tracker.record_spend("test", "model", cost_usd=1.123456789)
summary = tracker.get_summary()
# daily_usd should be rounded to 6 decimal places
assert summary["daily_usd"] == 1.123457
def test_summary_with_disabled_limits(self):
"""Summary should handle disabled limits (0)."""
tracker = BudgetTracker(db_path=":memory:")
with patch("infrastructure.models.budget.settings") as mock_settings:
mock_settings.tier_cloud_daily_budget_usd = 0
mock_settings.tier_cloud_monthly_budget_usd = 0
tracker.record_spend("test", "model", cost_usd=100.0)
summary = tracker.get_summary()
assert summary["daily_limit_usd"] == 0
assert summary["monthly_limit_usd"] == 0
assert summary["daily_ok"] is True
assert summary["monthly_ok"] is True
# ── Test BudgetTracker in-memory fallback ─────────────────────────────────────
class TestBudgetTrackerInMemoryFallback:
"""Tests for in-memory fallback when DB is unavailable."""
def test_in_memory_records_persisted(self):
"""Records should be stored in memory when DB is unavailable."""
tracker = BudgetTracker(db_path=":memory:")
# Force DB to appear unavailable
tracker._db_ok = False
tracker.record_spend("test", "model", cost_usd=0.01)
assert len(tracker._in_memory) == 1
assert tracker._in_memory[0].cost_usd == 0.01
def test_in_memory_query_spend(self):
"""Query spend should work with in-memory fallback."""
tracker = BudgetTracker(db_path=":memory:")
tracker._db_ok = False
tracker.record_spend("test", "model", cost_usd=0.01)
# Query should work from in-memory
since_ts = (datetime.now(UTC) - timedelta(hours=1)).timestamp()
result = tracker._query_spend(since_ts)
assert result == 0.01
def test_in_memory_older_records_not_counted(self):
"""In-memory records older than since_ts should not be counted."""
tracker = BudgetTracker(db_path=":memory:")
tracker._db_ok = False
old_ts = (datetime.now(UTC) - timedelta(days=2)).timestamp()
tracker._in_memory.append(
SpendRecord(old_ts, "test", "model", 0, 0, 1.0, "cloud")
)
# Query for records in last day
since_ts = (datetime.now(UTC) - timedelta(days=1)).timestamp()
result = tracker._query_spend(since_ts)
assert result == 0.0
# ── Test BudgetTracker thread safety ──────────────────────────────────────────
class TestBudgetTrackerThreadSafety:
"""Tests for thread-safe operations."""
def test_concurrent_record_spend(self):
"""Multiple threads should safely record spend concurrently."""
tracker = BudgetTracker(db_path=":memory:")
results = []
errors = []
def record_spends():
try:
for _ in range(10):
cost = tracker.record_spend("test", "model", cost_usd=0.01)
results.append(cost)
except Exception as e:
errors.append(e)
threads = [threading.Thread(target=record_spends) for _ in range(5)]
for t in threads:
t.start()
for t in threads:
t.join()
assert len(errors) == 0
assert len(results) == 50
assert tracker.get_daily_spend() == pytest.approx(0.50, abs=1e-9)
# ── Test BudgetTracker edge cases ─────────────────────────────────────────────
class TestBudgetTrackerEdgeCases:
"""Tests for edge cases and boundary conditions."""
def test_very_small_cost(self):
"""Tracker should handle very small costs."""
tracker = BudgetTracker(db_path=":memory:")
tracker.record_spend("test", "model", cost_usd=0.000001)
assert tracker.get_daily_spend() == pytest.approx(0.000001, abs=1e-9)
def test_very_large_cost(self):
"""Tracker should handle very large costs."""
tracker = BudgetTracker(db_path=":memory:")
tracker.record_spend("test", "model", cost_usd=1_000_000.0)
assert tracker.get_daily_spend() == pytest.approx(1_000_000.0, abs=1e-9)
def test_many_records(self):
"""Tracker should handle many records efficiently."""
tracker = BudgetTracker(db_path=":memory:")
for i in range(100):
tracker.record_spend(f"provider_{i}", f"model_{i}", cost_usd=0.01)
assert tracker.get_daily_spend() == pytest.approx(1.0, abs=1e-9)
def test_empty_provider_name(self):
"""Tracker should handle empty provider name."""
tracker = BudgetTracker(db_path=":memory:")
cost = tracker.record_spend("", "model", cost_usd=0.01)
assert cost == 0.01
def test_empty_model_name(self):
"""Tracker should handle empty model name."""
tracker = BudgetTracker(db_path=":memory:")
cost = tracker.record_spend("provider", "", cost_usd=0.01)
assert cost == 0.01
# ── Test get_budget_tracker singleton ─────────────────────────────────────────
class TestGetBudgetTrackerSingleton:
"""Tests for the module-level BudgetTracker singleton."""
def test_returns_budget_tracker(self):
"""Singleton should return a BudgetTracker instance."""
import infrastructure.models.budget as bmod
bmod._budget_tracker = None
tracker = get_budget_tracker()
assert isinstance(tracker, BudgetTracker)
def test_returns_same_instance(self):
"""Singleton should return the same instance."""
import infrastructure.models.budget as bmod
bmod._budget_tracker = None
t1 = get_budget_tracker()
t2 = get_budget_tracker()
assert t1 is t2
def test_singleton_persists_state(self):
"""Singleton should persist state across calls."""
import infrastructure.models.budget as bmod
bmod._budget_tracker = None
tracker1 = get_budget_tracker()
# Record spend
tracker1.record_spend("test", "model", cost_usd=1.0)
# Get singleton again
tracker2 = get_budget_tracker()
assert tracker1 is tracker2
# ── Test BudgetTracker with mocked settings ───────────────────────────────────
class TestBudgetTrackerWithMockedSettings:
"""Tests using mocked settings for different scenarios."""
def test_high_daily_limit(self):
"""Test with high daily limit."""
tracker = BudgetTracker(db_path=":memory:")
with patch("infrastructure.models.budget.settings") as mock_settings:
mock_settings.tier_cloud_daily_budget_usd = 1000.0
mock_settings.tier_cloud_monthly_budget_usd = 10000.0
tracker.record_spend("test", "model", cost_usd=500.0)
assert tracker.cloud_allowed() is True
def test_low_daily_limit(self):
"""Test with low daily limit."""
tracker = BudgetTracker(db_path=":memory:")
with patch("infrastructure.models.budget.settings") as mock_settings:
mock_settings.tier_cloud_daily_budget_usd = 1.0
mock_settings.tier_cloud_monthly_budget_usd = 100.0
tracker.record_spend("test", "model", cost_usd=2.0)
assert tracker.cloud_allowed() is False
def test_only_monthly_limit_enabled(self):
"""Test with only monthly limit enabled."""
tracker = BudgetTracker(db_path=":memory:")
with patch("infrastructure.models.budget.settings") as mock_settings:
mock_settings.tier_cloud_daily_budget_usd = 0 # Disabled
mock_settings.tier_cloud_monthly_budget_usd = 50.0
tracker.record_spend("test", "model", cost_usd=30.0)
assert tracker.cloud_allowed() is True
tracker.record_spend("test", "model", cost_usd=25.0)
assert tracker.cloud_allowed() is False

View File

@@ -287,6 +287,148 @@ class TestJotNote:
assert "body is empty" in jot_note("title", " ")
class TestHotMemoryTimestamp:
"""Tests for Working RAM auto-updating timestamp (issue #10)."""
def test_read_includes_last_updated_when_facts_exist(self, tmp_path):
"""HotMemory.read() includes a 'Last updated' timestamp when DB has facts."""
db_path = tmp_path / "memory.db"
with (
patch("timmy.memory.db.DB_PATH", db_path),
patch("timmy.memory.crud.get_connection") as mock_conn,
):
import sqlite3
from contextlib import contextmanager
real_conn = sqlite3.connect(str(db_path))
real_conn.row_factory = sqlite3.Row
real_conn.execute("""
CREATE TABLE IF NOT EXISTS memories (
id TEXT PRIMARY KEY,
content TEXT NOT NULL,
memory_type TEXT NOT NULL DEFAULT 'fact',
source TEXT NOT NULL DEFAULT 'agent',
embedding TEXT, metadata TEXT, source_hash TEXT,
agent_id TEXT, task_id TEXT, session_id TEXT,
confidence REAL NOT NULL DEFAULT 0.8,
tags TEXT NOT NULL DEFAULT '[]',
created_at TEXT NOT NULL,
last_accessed TEXT,
access_count INTEGER NOT NULL DEFAULT 0
)
""")
real_conn.execute(
"INSERT INTO memories (id, content, memory_type, source, created_at) "
"VALUES ('1', 'User prefers dark mode', 'fact', 'system', '2026-03-20T10:00:00+00:00')"
)
real_conn.commit()
@contextmanager
def fake_get_connection():
yield real_conn
mock_conn.side_effect = fake_get_connection
hot = HotMemory()
result = hot.read()
assert "> Last updated:" in result
assert "2026-03-20" in result
assert "User prefers dark mode" in result
def test_read_timestamp_reflects_most_recent_memory(self, tmp_path):
"""The timestamp in HotMemory.read() matches the latest memory's created_at."""
db_path = tmp_path / "memory.db"
with patch("timmy.memory.crud.get_connection") as mock_conn:
import sqlite3
from contextlib import contextmanager
real_conn = sqlite3.connect(str(db_path))
real_conn.row_factory = sqlite3.Row
real_conn.execute("""
CREATE TABLE IF NOT EXISTS memories (
id TEXT PRIMARY KEY,
content TEXT NOT NULL,
memory_type TEXT NOT NULL DEFAULT 'fact',
source TEXT NOT NULL DEFAULT 'agent',
embedding TEXT, metadata TEXT, source_hash TEXT,
agent_id TEXT, task_id TEXT, session_id TEXT,
confidence REAL NOT NULL DEFAULT 0.8,
tags TEXT NOT NULL DEFAULT '[]',
created_at TEXT NOT NULL,
last_accessed TEXT,
access_count INTEGER NOT NULL DEFAULT 0
)
""")
# Older fact
real_conn.execute(
"INSERT INTO memories (id, content, memory_type, source, created_at) "
"VALUES ('1', 'old fact', 'fact', 'system', '2026-03-15T08:00:00+00:00')"
)
# Newer fact — this should be reflected in the timestamp
real_conn.execute(
"INSERT INTO memories (id, content, memory_type, source, created_at) "
"VALUES ('2', 'new fact', 'fact', 'system', '2026-03-23T14:30:00+00:00')"
)
real_conn.commit()
@contextmanager
def fake_get_connection():
yield real_conn
mock_conn.side_effect = fake_get_connection
hot = HotMemory()
result = hot.read()
assert "2026-03-23" in result
assert "> Last updated:" in result
def test_read_falls_back_to_file_when_db_empty(self, tmp_path):
"""HotMemory.read() falls back to MEMORY.md when DB has no facts or reflections."""
mem_file = tmp_path / "MEMORY.md"
mem_file.write_text("# Timmy Hot Memory\n\n## Current Status\n\nOperational\n")
with patch("timmy.memory.crud.get_connection") as mock_conn:
import sqlite3
from contextlib import contextmanager
db_path = tmp_path / "empty.db"
real_conn = sqlite3.connect(str(db_path))
real_conn.row_factory = sqlite3.Row
real_conn.execute("""
CREATE TABLE IF NOT EXISTS memories (
id TEXT PRIMARY KEY,
content TEXT NOT NULL,
memory_type TEXT NOT NULL DEFAULT 'fact',
source TEXT NOT NULL DEFAULT 'agent',
embedding TEXT, metadata TEXT, source_hash TEXT,
agent_id TEXT, task_id TEXT, session_id TEXT,
confidence REAL NOT NULL DEFAULT 0.8,
tags TEXT NOT NULL DEFAULT '[]',
created_at TEXT NOT NULL,
last_accessed TEXT,
access_count INTEGER NOT NULL DEFAULT 0
)
""")
real_conn.commit()
@contextmanager
def fake_get_connection():
yield real_conn
mock_conn.side_effect = fake_get_connection
hot = HotMemory()
hot.path = mem_file
result = hot.read()
assert "Operational" in result
assert "> Last updated:" not in result
class TestLogDecision:
"""Tests for log_decision() artifact tool."""