[claude] Integrate Claude Quota Monitor + Metabolic Protocol into cascade router (#1075) #1086

Merged
claude merged 1 commits from claude/issue-1075 into main 2026-03-23 15:18:11 +00:00
4 changed files with 752 additions and 0 deletions

186
scripts/claude_quota_check.sh Executable file
View File

@@ -0,0 +1,186 @@
#!/bin/bash
# ═══════════════════════════════════════════════════════════════
# claude_quota_check.sh — Check Claude Code / Claude.ai quota
#
# Usage:
# ./claude_quota_check.sh # Human-readable output
# ./claude_quota_check.sh --json # Raw JSON for piping
# ./claude_quota_check.sh --watch # Refresh every 60s
#
# Requires: macOS with Claude Code authenticated, python3
# Token is read from macOS Keychain (same as Claude Code uses)
# ═══════════════════════════════════════════════════════════════
set -euo pipefail
# ── Extract OAuth token from macOS Keychain ──
get_token() {
local creds
creds=$(security find-generic-password -s "Claude Code-credentials" -w 2>/dev/null) || {
echo "ERROR: No Claude Code credentials found in Keychain." >&2
echo "Run 'claude' and authenticate first." >&2
exit 1
}
echo "$creds" | python3 -c "
import sys, json
data = json.load(sys.stdin)
oauth = data.get('claudeAiOauth', data)
print(oauth['accessToken'])
" 2>/dev/null || {
echo "ERROR: Could not parse credentials JSON." >&2
exit 1
}
}
# ── Fetch usage from Anthropic API ──
fetch_usage() {
local token="$1"
curl -s "https://api.anthropic.com/api/oauth/usage" \
-H "Accept: application/json" \
-H "Content-Type: application/json" \
-H "User-Agent: claude-code/2.0.32" \
-H "Authorization: Bearer ${token}" \
-H "anthropic-beta: oauth-2025-04-20"
}
# ── Format time remaining ──
time_remaining() {
local reset_at="$1"
if [ -z "$reset_at" ] || [ "$reset_at" = "null" ]; then
echo "unknown"
return
fi
python3 -c "
from datetime import datetime, timezone
reset = datetime.fromisoformat('${reset_at}'.replace('Z', '+00:00'))
now = datetime.now(timezone.utc)
diff = reset - now
if diff.total_seconds() <= 0:
print('resetting now')
else:
hours = int(diff.total_seconds() // 3600)
mins = int((diff.total_seconds() % 3600) // 60)
if hours > 0:
print(f'{hours}h {mins}m')
else:
print(f'{mins}m')
" 2>/dev/null || echo "unknown"
}
# ── Bar visualization ──
usage_bar() {
local pct=$1
local width=30
local filled
filled=$(python3 -c "print(int(${pct} * ${width}))")
local empty=$((width - filled))
# Color: green < 50%, yellow 50-80%, red > 80%
local color=""
if (( $(echo "$pct < 0.50" | bc -l) )); then
color="\033[32m" # green
elif (( $(echo "$pct < 0.80" | bc -l) )); then
color="\033[33m" # yellow
else
color="\033[31m" # red
fi
printf "${color}"
for ((i=0; i<filled; i++)); do printf "█"; done
printf "\033[90m"
for ((i=0; i<empty; i++)); do printf "░"; done
printf "\033[0m"
}
# ── Display formatted output ──
display() {
local usage_json="$1"
local now
now=$(date "+%Y-%m-%d %H:%M:%S %Z")
local five_util five_reset seven_util seven_reset
five_util=$(echo "$usage_json" | python3 -c "import sys,json; d=json.load(sys.stdin); h=d.get('five_hour') or {}; print(h.get('utilization', 0))" 2>/dev/null || echo "0")
five_reset=$(echo "$usage_json" | python3 -c "import sys,json; d=json.load(sys.stdin); h=d.get('five_hour') or {}; print(h.get('resets_at', 'null'))" 2>/dev/null || echo "null")
seven_util=$(echo "$usage_json" | python3 -c "import sys,json; d=json.load(sys.stdin); h=d.get('seven_day') or {}; print(h.get('utilization', 0))" 2>/dev/null || echo "0")
seven_reset=$(echo "$usage_json" | python3 -c "import sys,json; d=json.load(sys.stdin); h=d.get('seven_day') or {}; print(h.get('resets_at', 'null'))" 2>/dev/null || echo "null")
local five_pct seven_pct
five_pct=$(python3 -c "print(int(float('${five_util}') * 100))")
seven_pct=$(python3 -c "print(int(float('${seven_util}') * 100))")
local five_remaining seven_remaining
five_remaining=$(time_remaining "$five_reset")
seven_remaining=$(time_remaining "$seven_reset")
echo ""
echo " ┌─────────────────────────────────────────────┐"
echo " │ CLAUDE QUOTA STATUS │"
printf " │ %-38s│\n" "$now"
echo " ├─────────────────────────────────────────────┤"
printf " │ 5-hour window: "
usage_bar "$five_util"
printf " %3d%% │\n" "$five_pct"
printf " │ Resets in: %-33s│\n" "$five_remaining"
echo " │ │"
printf " │ 7-day window: "
usage_bar "$seven_util"
printf " %3d%% │\n" "$seven_pct"
printf " │ Resets in: %-33s│\n" "$seven_remaining"
echo " └─────────────────────────────────────────────┘"
echo ""
# Decision guidance for Timmy
if (( five_pct >= 80 )); then
echo " ⚠ 5-hour window critical. Switch to local Qwen3-14B."
echo " Reserve remaining quota for high-value tasks only."
elif (( five_pct >= 50 )); then
echo " ~ 5-hour window half spent. Batch remaining requests."
else
echo " ✓ 5-hour window healthy. Full speed ahead."
fi
if (( seven_pct >= 80 )); then
echo " ⚠ Weekly quota critical! Operate in local-only mode."
elif (( seven_pct >= 60 )); then
echo " ~ Weekly quota past 60%. Plan usage carefully."
fi
echo ""
}
# ── Main ──
main() {
local token
token=$(get_token)
local usage
usage=$(fetch_usage "$token")
if [ -z "$usage" ] || echo "$usage" | grep -q '"error"'; then
echo "ERROR: Failed to fetch usage data." >&2
echo "$usage" >&2
exit 1
fi
case "${1:-}" in
--json)
echo "$usage" | python3 -m json.tool
;;
--watch)
while true; do
clear
usage=$(fetch_usage "$token")
display "$usage"
echo " Refreshing in 60s... (Ctrl+C to stop)"
sleep 60
done
;;
*)
display "$usage"
;;
esac
}
main "$@"

View File

@@ -0,0 +1,260 @@
"""
claude_quota.py — Claude Code / Claude.ai Quota Monitor
Drop into src/infrastructure/ in the Timmy Time Dashboard repo.
Provides real-time quota visibility and metabolic protocol decisions.
Usage:
from infrastructure.claude_quota import QuotaMonitor
monitor = QuotaMonitor()
status = monitor.check()
print(status.five_hour_pct) # 42
print(status.five_hour_resets_in) # "2h 15m"
print(status.seven_day_pct) # 29
print(status.recommended_tier) # MetabolicTier.BURST
# Metabolic protocol: auto-select model based on quota
model = monitor.select_model(task_complexity="high")
# Returns "claude-sonnet-4-6" if quota allows, else "qwen3:14b"
"""
import json
import logging
import subprocess
import urllib.request
from dataclasses import dataclass
from datetime import datetime, timezone
from enum import Enum
from typing import Optional
logger = logging.getLogger(__name__)
class MetabolicTier(str, Enum):
"""The three-tier metabolic protocol from the Timmy Time architecture."""
BURST = "burst" # Cloud API (Claude/Groq) — expensive, best quality
ACTIVE = "active" # Local 14B (Qwen3-14B) — free, good quality
RESTING = "resting" # Local 8B (Qwen3-8B) — free, fast, adequate
@dataclass
class QuotaStatus:
"""Current Claude quota state."""
five_hour_utilization: float # 0.0 to 1.0
five_hour_resets_at: Optional[str]
seven_day_utilization: float # 0.0 to 1.0
seven_day_resets_at: Optional[str]
raw_response: dict
fetched_at: datetime
@property
def five_hour_pct(self) -> int:
return int(self.five_hour_utilization * 100)
@property
def seven_day_pct(self) -> int:
return int(self.seven_day_utilization * 100)
@property
def five_hour_resets_in(self) -> str:
return _time_remaining(self.five_hour_resets_at)
@property
def seven_day_resets_in(self) -> str:
return _time_remaining(self.seven_day_resets_at)
@property
def recommended_tier(self) -> MetabolicTier:
"""Metabolic protocol: determine which inference tier to use."""
# If weekly quota is critical, go full local
if self.seven_day_utilization >= 0.80:
return MetabolicTier.RESTING
# If 5-hour window is critical or past half, use local
if self.five_hour_utilization >= 0.50:
return MetabolicTier.ACTIVE
# Quota healthy — cloud available for high-value tasks
return MetabolicTier.BURST
def summary(self) -> str:
"""Human-readable status string."""
return (
f"5h: {self.five_hour_pct}% (resets {self.five_hour_resets_in}) | "
f"7d: {self.seven_day_pct}% (resets {self.seven_day_resets_in}) | "
f"tier: {self.recommended_tier.value}"
)
class QuotaMonitor:
"""
Monitors Claude Code / Claude.ai quota via the internal OAuth API.
The token is read from macOS Keychain where Claude Code stores it.
Falls back gracefully if credentials aren't available (e.g., on Linux VPS).
"""
API_URL = "https://api.anthropic.com/api/oauth/usage"
KEYCHAIN_SERVICE = "Claude Code-credentials"
USER_AGENT = "claude-code/2.0.32"
def __init__(self) -> None:
self._token: Optional[str] = None
self._last_status: Optional[QuotaStatus] = None
self._cache_seconds = 30 # Don't hammer the API
def _get_token(self) -> Optional[str]:
"""Extract OAuth token from macOS Keychain."""
if self._token:
return self._token
try:
result = subprocess.run(
["security", "find-generic-password", "-s", self.KEYCHAIN_SERVICE, "-w"],
capture_output=True,
text=True,
timeout=5,
)
if result.returncode != 0:
logger.warning("Claude Code credentials not found in Keychain")
return None
creds = json.loads(result.stdout.strip())
oauth = creds.get("claudeAiOauth", creds)
self._token = oauth.get("accessToken")
return self._token
except (json.JSONDecodeError, KeyError, FileNotFoundError, subprocess.TimeoutExpired) as exc:
logger.warning("Could not read Claude Code credentials: %s", exc)
return None
def check(self, force: bool = False) -> Optional[QuotaStatus]:
"""
Fetch current quota status.
Returns None if credentials aren't available (graceful degradation).
Caches results for 30 seconds to avoid rate limiting the quota API itself.
"""
# Return cached if fresh
if not force and self._last_status:
age = (datetime.now(timezone.utc) - self._last_status.fetched_at).total_seconds()
if age < self._cache_seconds:
return self._last_status
token = self._get_token()
if not token:
return None
try:
req = urllib.request.Request(
self.API_URL,
headers={
"Accept": "application/json",
"Content-Type": "application/json",
"User-Agent": self.USER_AGENT,
"Authorization": f"Bearer {token}",
"anthropic-beta": "oauth-2025-04-20",
},
)
with urllib.request.urlopen(req, timeout=10) as resp:
data = json.loads(resp.read().decode())
five_hour = data.get("five_hour") or {}
seven_day = data.get("seven_day") or {}
self._last_status = QuotaStatus(
five_hour_utilization=float(five_hour.get("utilization", 0.0)),
five_hour_resets_at=five_hour.get("resets_at"),
seven_day_utilization=float(seven_day.get("utilization", 0.0)),
seven_day_resets_at=seven_day.get("resets_at"),
raw_response=data,
fetched_at=datetime.now(timezone.utc),
)
return self._last_status
except Exception as exc:
logger.warning("Failed to fetch quota: %s", exc)
return self._last_status # Return stale data if available
def select_model(self, task_complexity: str = "medium") -> str:
"""
Metabolic protocol: select the right model based on quota + task complexity.
Returns an Ollama model tag or "claude-sonnet-4-6" for cloud.
task_complexity: "low" | "medium" | "high"
"""
status = self.check()
# No quota info available — assume local only (sovereign default)
if status is None:
return "qwen3:14b" if task_complexity == "high" else "qwen3:8b"
tier = status.recommended_tier
if tier == MetabolicTier.BURST and task_complexity == "high":
return "claude-sonnet-4-6" # Cloud — best quality
elif tier == MetabolicTier.BURST and task_complexity == "medium":
return "qwen3:14b" # Save cloud for truly hard tasks
elif tier == MetabolicTier.ACTIVE:
return "qwen3:14b" # Local 14B — good enough
else: # RESTING
return "qwen3:8b" # Local 8B — conserve everything
def should_use_cloud(self, task_value: str = "normal") -> bool:
"""
Simple yes/no: should this task use cloud API?
task_value: "critical" | "high" | "normal" | "routine"
"""
status = self.check()
if status is None:
return False # No credentials = local only
if task_value == "critical":
return status.seven_day_utilization < 0.95 # Almost always yes
elif task_value == "high":
return status.five_hour_utilization < 0.60
elif task_value == "normal":
return status.five_hour_utilization < 0.30
else: # routine
return False # Never waste cloud on routine
def _time_remaining(reset_at: Optional[str]) -> str:
"""Format time until reset as human-readable string."""
if not reset_at or reset_at == "null":
return "unknown"
try:
reset = datetime.fromisoformat(reset_at.replace("Z", "+00:00"))
now = datetime.now(timezone.utc)
diff = reset - now
if diff.total_seconds() <= 0:
return "resetting now"
hours = int(diff.total_seconds() // 3600)
mins = int((diff.total_seconds() % 3600) // 60)
if hours > 0:
return f"{hours}h {mins}m"
return f"{mins}m"
except (ValueError, TypeError):
return "unknown"
# Module-level singleton
_quota_monitor: Optional[QuotaMonitor] = None
def get_quota_monitor() -> QuotaMonitor:
"""Get or create the quota monitor singleton."""
global _quota_monitor
if _quota_monitor is None:
_quota_monitor = QuotaMonitor()
return _quota_monitor

View File

@@ -32,6 +32,15 @@ except ImportError:
logger = logging.getLogger(__name__)
# Quota monitor — optional, degrades gracefully if unavailable
try:
from infrastructure.claude_quota import QuotaMonitor, get_quota_monitor
_quota_monitor: "QuotaMonitor | None" = get_quota_monitor()
except Exception as _exc: # pragma: no cover
logger.debug("Quota monitor not available: %s", _exc)
_quota_monitor = None
class ProviderStatus(Enum):
"""Health status of a provider."""
@@ -457,6 +466,25 @@ class CascadeRouter:
raise RuntimeError("; ".join(errors))
def _quota_allows_cloud(self, provider: Provider) -> bool:
"""Check quota before routing to a cloud provider.
Uses the metabolic protocol: cloud calls are gated by 5-hour quota.
Returns True (allow cloud) if quota monitor is unavailable or returns None.
"""
if _quota_monitor is None:
return True
try:
# Map provider type to task_value heuristic
task_value = "high" # conservative default
status = _quota_monitor.check()
if status is None:
return True # No credentials — caller decides based on config
return _quota_monitor.should_use_cloud(task_value)
except Exception as exc:
logger.warning("Quota check failed, allowing cloud: %s", exc)
return True
def _is_provider_available(self, provider: Provider) -> bool:
"""Check if a provider should be tried (enabled + circuit breaker)."""
if not provider.enabled:
@@ -510,6 +538,15 @@ class CascadeRouter:
if not self._is_provider_available(provider):
continue
# Metabolic protocol: skip cloud providers when quota is low
if provider.type in ("anthropic", "openai", "grok"):
if not self._quota_allows_cloud(provider):
logger.info(
"Metabolic protocol: skipping cloud provider %s (quota too low)",
provider.name,
)
continue
selected_model, is_fallback_model = self._select_model(provider, model, content_type)
try:

View File

@@ -0,0 +1,269 @@
"""Tests for Claude Quota Monitor and Metabolic Protocol."""
from datetime import datetime, timedelta, timezone
from unittest.mock import MagicMock, patch
import pytest
from infrastructure.claude_quota import (
MetabolicTier,
QuotaMonitor,
QuotaStatus,
_time_remaining,
get_quota_monitor,
)
def _make_status(five_hour: float = 0.0, seven_day: float = 0.0) -> QuotaStatus:
"""Helper: build a QuotaStatus with given utilization values."""
return QuotaStatus(
five_hour_utilization=five_hour,
five_hour_resets_at=None,
seven_day_utilization=seven_day,
seven_day_resets_at=None,
raw_response={},
fetched_at=datetime.now(timezone.utc),
)
class TestMetabolicTierThresholds:
"""Test the three-tier metabolic protocol thresholds."""
def test_burst_when_five_hour_below_50pct(self):
status = _make_status(five_hour=0.49, seven_day=0.10)
assert status.recommended_tier == MetabolicTier.BURST
def test_burst_at_zero_utilization(self):
status = _make_status(five_hour=0.0, seven_day=0.0)
assert status.recommended_tier == MetabolicTier.BURST
def test_active_when_five_hour_at_50pct(self):
status = _make_status(five_hour=0.50, seven_day=0.10)
assert status.recommended_tier == MetabolicTier.ACTIVE
def test_active_when_five_hour_between_50_and_80pct(self):
status = _make_status(five_hour=0.79, seven_day=0.10)
assert status.recommended_tier == MetabolicTier.ACTIVE
def test_active_when_five_hour_at_80pct(self):
# five_hour >= 0.80 but seven_day < 0.80 → ACTIVE (not RESTING)
status = _make_status(five_hour=0.80, seven_day=0.50)
assert status.recommended_tier == MetabolicTier.ACTIVE
def test_resting_when_seven_day_at_80pct(self):
status = _make_status(five_hour=0.30, seven_day=0.80)
assert status.recommended_tier == MetabolicTier.RESTING
def test_resting_when_seven_day_above_80pct(self):
status = _make_status(five_hour=0.10, seven_day=0.95)
assert status.recommended_tier == MetabolicTier.RESTING
def test_resting_when_both_critical(self):
status = _make_status(five_hour=0.90, seven_day=0.90)
assert status.recommended_tier == MetabolicTier.RESTING
def test_seven_day_takes_precedence_over_five_hour(self):
# Weekly quota critical overrides whatever five-hour says
status = _make_status(five_hour=0.10, seven_day=0.85)
assert status.recommended_tier == MetabolicTier.RESTING
class TestQuotaStatusProperties:
"""Test QuotaStatus computed properties."""
def test_five_hour_pct(self):
status = _make_status(five_hour=0.42)
assert status.five_hour_pct == 42
def test_seven_day_pct(self):
status = _make_status(seven_day=0.75)
assert status.seven_day_pct == 75
def test_summary_contains_tier(self):
status = _make_status(five_hour=0.20, seven_day=0.10)
summary = status.summary()
assert "burst" in summary
assert "20%" in summary
def test_five_hour_resets_in_unknown_when_none(self):
status = _make_status()
assert status.five_hour_resets_in == "unknown"
def test_seven_day_resets_in_unknown_when_none(self):
status = _make_status()
assert status.seven_day_resets_in == "unknown"
class TestTimeRemaining:
"""Test _time_remaining helper."""
def test_none_returns_unknown(self):
assert _time_remaining(None) == "unknown"
def test_empty_string_returns_unknown(self):
assert _time_remaining("") == "unknown"
def test_past_time_returns_resetting_now(self):
past = (datetime.now(timezone.utc) - timedelta(hours=1)).isoformat()
assert _time_remaining(past) == "resetting now"
def test_future_time_hours_and_minutes(self):
future = (datetime.now(timezone.utc) + timedelta(hours=2, minutes=15)).isoformat()
result = _time_remaining(future)
assert "2h" in result
# Minutes may vary ±1 due to test execution time
assert "m" in result
def test_future_time_minutes_only(self):
future = (datetime.now(timezone.utc) + timedelta(minutes=45)).isoformat()
result = _time_remaining(future)
assert "h" not in result
# Minutes may vary ±1 due to test execution time
assert "m" in result
def test_z_suffix_handled(self):
future = (datetime.now(timezone.utc) + timedelta(hours=1)).strftime("%Y-%m-%dT%H:%M:%SZ")
result = _time_remaining(future)
assert result != "unknown"
class TestQuotaMonitorSelectModel:
"""Test select_model metabolic routing."""
def test_no_quota_high_complexity_returns_14b(self):
monitor = QuotaMonitor()
monitor._get_token = lambda: None
assert monitor.select_model("high") == "qwen3:14b"
def test_no_quota_low_complexity_returns_8b(self):
monitor = QuotaMonitor()
monitor._get_token = lambda: None
assert monitor.select_model("low") == "qwen3:8b"
def test_burst_tier_high_complexity_returns_cloud(self):
monitor = QuotaMonitor()
monitor._last_status = _make_status(five_hour=0.10, seven_day=0.10)
monitor._cache_seconds = 9999
result = monitor.select_model("high")
assert result == "claude-sonnet-4-6"
def test_burst_tier_medium_complexity_returns_14b(self):
monitor = QuotaMonitor()
monitor._last_status = _make_status(five_hour=0.10, seven_day=0.10)
monitor._cache_seconds = 9999
result = monitor.select_model("medium")
assert result == "qwen3:14b"
def test_active_tier_returns_14b(self):
monitor = QuotaMonitor()
monitor._last_status = _make_status(five_hour=0.65, seven_day=0.10)
monitor._cache_seconds = 9999
result = monitor.select_model("high")
assert result == "qwen3:14b"
def test_resting_tier_returns_8b(self):
monitor = QuotaMonitor()
monitor._last_status = _make_status(five_hour=0.10, seven_day=0.85)
monitor._cache_seconds = 9999
result = monitor.select_model("high")
assert result == "qwen3:8b"
class TestQuotaMonitorShouldUseCloud:
"""Test should_use_cloud gate."""
def test_no_credentials_always_false(self):
monitor = QuotaMonitor()
monitor._get_token = lambda: None
assert monitor.should_use_cloud("critical") is False
def test_critical_task_allowed_when_under_95pct(self):
monitor = QuotaMonitor()
monitor._last_status = _make_status(five_hour=0.10, seven_day=0.94)
monitor._cache_seconds = 9999
assert monitor.should_use_cloud("critical") is True
def test_critical_task_blocked_when_over_95pct(self):
monitor = QuotaMonitor()
monitor._last_status = _make_status(five_hour=0.10, seven_day=0.96)
monitor._cache_seconds = 9999
assert monitor.should_use_cloud("critical") is False
def test_high_task_allowed_under_60pct(self):
monitor = QuotaMonitor()
monitor._last_status = _make_status(five_hour=0.59, seven_day=0.10)
monitor._cache_seconds = 9999
assert monitor.should_use_cloud("high") is True
def test_high_task_blocked_at_60pct(self):
monitor = QuotaMonitor()
monitor._last_status = _make_status(five_hour=0.60, seven_day=0.10)
monitor._cache_seconds = 9999
assert monitor.should_use_cloud("high") is False
def test_normal_task_allowed_under_30pct(self):
monitor = QuotaMonitor()
monitor._last_status = _make_status(five_hour=0.29, seven_day=0.10)
monitor._cache_seconds = 9999
assert monitor.should_use_cloud("normal") is True
def test_normal_task_blocked_at_30pct(self):
monitor = QuotaMonitor()
monitor._last_status = _make_status(five_hour=0.30, seven_day=0.10)
monitor._cache_seconds = 9999
assert monitor.should_use_cloud("normal") is False
def test_routine_task_always_false(self):
monitor = QuotaMonitor()
monitor._last_status = _make_status(five_hour=0.0, seven_day=0.0)
monitor._cache_seconds = 9999
assert monitor.should_use_cloud("routine") is False
class TestQuotaMonitorCaching:
"""Test 30-second TTL cache."""
def test_cached_result_returned_within_ttl(self):
monitor = QuotaMonitor()
fresh_status = _make_status(five_hour=0.10)
monitor._last_status = fresh_status
monitor._cache_seconds = 30
# Should NOT re-fetch — returns cached
with patch.object(monitor, "_get_token", return_value="tok") as mock_tok:
result = monitor.check()
mock_tok.assert_not_called()
assert result is fresh_status
def test_stale_cache_triggers_fetch(self):
monitor = QuotaMonitor()
old_time = datetime.now(timezone.utc) - timedelta(seconds=60)
stale_status = QuotaStatus(
five_hour_utilization=0.10,
five_hour_resets_at=None,
seven_day_utilization=0.10,
seven_day_resets_at=None,
raw_response={},
fetched_at=old_time,
)
monitor._last_status = stale_status
# Token unavailable → returns None (triggers re-fetch path)
with patch.object(monitor, "_get_token", return_value=None):
result = monitor.check()
assert result is None # No credentials after cache miss
class TestGetQuotaMonitorSingleton:
"""Test module-level singleton."""
def test_returns_same_instance(self):
m1 = get_quota_monitor()
m2 = get_quota_monitor()
assert m1 is m2
def test_returns_quota_monitor_instance(self):
monitor = get_quota_monitor()
assert isinstance(monitor, QuotaMonitor)