From 75f523f5c033733377db3d68cd685bc7e720bdb1 Mon Sep 17 00:00:00 2001 From: teknium1 Date: Fri, 6 Mar 2026 14:18:19 -0800 Subject: [PATCH] fix: unknown/custom models get zero cost instead of fake estimates Custom OAI endpoints, self-hosted models, and local inference should NOT show fabricated cost estimates. Changed default pricing from $3/$12 per million tokens to $0/$0 for unrecognized models. - Added _has_known_pricing() to distinguish commercial vs custom models - Models with known pricing show $ amounts; unknown models show 'N/A' - Overview shows asterisk + note when some models lack pricing data - Gateway format adds '(excludes custom/self-hosted models)' note - Added 7 new tests for custom model cost handling --- agent/insights.py | 57 +++++++++++++++++++++------ tests/test_insights.py | 87 +++++++++++++++++++++++++++++++++++++++++- 2 files changed, 132 insertions(+), 12 deletions(-) diff --git a/agent/insights.py b/agent/insights.py index 44783ce2a..e355dcf5b 100644 --- a/agent/insights.py +++ b/agent/insights.py @@ -56,12 +56,22 @@ MODEL_PRICING = { "llama-4-scout": {"input": 0.20, "output": 0.30}, } -# Fallback pricing for unknown models -_DEFAULT_PRICING = {"input": 3.00, "output": 12.00} +# Fallback: unknown/custom models get zero cost (we can't assume pricing +# for self-hosted models, custom OAI endpoints, local inference, etc.) +_DEFAULT_PRICING = {"input": 0.0, "output": 0.0} + + +def _has_known_pricing(model_name: str) -> bool: + """Check if a model has known pricing (vs unknown/custom endpoint).""" + return _get_pricing(model_name) is not _DEFAULT_PRICING def _get_pricing(model_name: str) -> Dict[str, float]: - """Look up pricing for a model. Uses fuzzy matching on model name.""" + """Look up pricing for a model. Uses fuzzy matching on model name. + + Returns _DEFAULT_PRICING (zero cost) for unknown/custom models — + we can't assume costs for self-hosted endpoints, local inference, etc. + """ if not model_name: return _DEFAULT_PRICING @@ -290,10 +300,19 @@ class InsightsEngine: total_messages = sum(s.get("message_count") or 0 for s in sessions) # Cost estimation (weighted by model) - total_cost = sum( - _estimate_cost(s.get("model", ""), s.get("input_tokens") or 0, s.get("output_tokens") or 0) - for s in sessions - ) + total_cost = 0.0 + models_with_pricing = set() + models_without_pricing = set() + for s in sessions: + model = s.get("model") or "" + inp = s.get("input_tokens") or 0 + out = s.get("output_tokens") or 0 + total_cost += _estimate_cost(model, inp, out) + display = model.split("/")[-1] if "/" in model else (model or "unknown") + if _has_known_pricing(model): + models_with_pricing.add(display) + else: + models_without_pricing.add(display) # Session duration stats durations = [] @@ -328,6 +347,8 @@ class InsightsEngine: "tool_messages": message_stats.get("tool_messages") or 0, "date_range_start": date_range_start, "date_range_end": date_range_end, + "models_with_pricing": models_with_pricing, + "models_without_pricing": models_without_pricing, } def _compute_model_breakdown(self, sessions: List[Dict]) -> List[Dict]: @@ -350,6 +371,7 @@ class InsightsEngine: d["total_tokens"] += inp + out d["tool_calls"] += s.get("tool_call_count") or 0 d["cost"] += _estimate_cost(model, inp, out) + d["has_pricing"] = _has_known_pricing(model) result = [ {"model": model, **data} @@ -556,7 +578,10 @@ class InsightsEngine: lines.append(f" Sessions: {o['total_sessions']:<12} Messages: {o['total_messages']:,}") lines.append(f" Tool calls: {o['total_tool_calls']:<12,} User messages: {o['user_messages']:,}") lines.append(f" Input tokens: {o['total_input_tokens']:<12,} Output tokens: {o['total_output_tokens']:,}") - lines.append(f" Total tokens: {o['total_tokens']:<12,} Est. cost: ${o['estimated_cost']:.2f}") + cost_str = f"${o['estimated_cost']:.2f}" + if o.get("models_without_pricing"): + cost_str += " *" + lines.append(f" Total tokens: {o['total_tokens']:<12,} Est. cost: {cost_str}") if o["total_hours"] > 0: lines.append(f" Active time: ~{_format_duration(o['total_hours'] * 3600):<11} Avg session: ~{_format_duration(o['avg_session_duration'])}") lines.append(f" Avg msgs/session: {o['avg_messages_per_session']:.1f}") @@ -569,7 +594,13 @@ class InsightsEngine: lines.append(f" {'Model':<30} {'Sessions':>8} {'Tokens':>12} {'Cost':>8}") for m in report["models"]: model_name = m["model"][:28] - lines.append(f" {model_name:<30} {m['sessions']:>8} {m['total_tokens']:>12,} ${m['cost']:>6.2f}") + if m.get("has_pricing"): + cost_cell = f"${m['cost']:>6.2f}" + else: + cost_cell = " N/A" + lines.append(f" {model_name:<30} {m['sessions']:>8} {m['total_tokens']:>12,} {cost_cell}") + if o.get("models_without_pricing"): + lines.append(f" * Cost N/A for custom/self-hosted models") lines.append("") # Platform breakdown @@ -650,7 +681,10 @@ class InsightsEngine: # Overview lines.append(f"**Sessions:** {o['total_sessions']} | **Messages:** {o['total_messages']:,} | **Tool calls:** {o['total_tool_calls']:,}") lines.append(f"**Tokens:** {o['total_tokens']:,} (in: {o['total_input_tokens']:,} / out: {o['total_output_tokens']:,})") - lines.append(f"**Est. cost:** ${o['estimated_cost']:.2f}") + cost_note = "" + if o.get("models_without_pricing"): + cost_note = " _(excludes custom/self-hosted models)_" + lines.append(f"**Est. cost:** ${o['estimated_cost']:.2f}{cost_note}") if o["total_hours"] > 0: lines.append(f"**Active time:** ~{_format_duration(o['total_hours'] * 3600)} | **Avg session:** ~{_format_duration(o['avg_session_duration'])}") lines.append("") @@ -659,7 +693,8 @@ class InsightsEngine: if report["models"]: lines.append("**🤖 Models:**") for m in report["models"][:5]: - lines.append(f" {m['model'][:25]} — {m['sessions']} sessions, {m['total_tokens']:,} tokens, ${m['cost']:.2f}") + cost_str = f"${m['cost']:.2f}" if m.get("has_pricing") else "N/A" + lines.append(f" {m['model'][:25]} — {m['sessions']} sessions, {m['total_tokens']:,} tokens, {cost_str}") lines.append("") # Platforms (if multi-platform) diff --git a/tests/test_insights.py b/tests/test_insights.py index 3cc7c7e80..b6a95c612 100644 --- a/tests/test_insights.py +++ b/tests/test_insights.py @@ -11,6 +11,7 @@ from agent.insights import ( _estimate_cost, _format_duration, _bar_chart, + _has_known_pricing, _DEFAULT_PRICING, ) @@ -145,9 +146,19 @@ class TestPricing: pricing = _get_pricing("anthropic/claude-haiku-future") assert pricing["input"] == 0.80 - def test_unknown_model_returns_default(self): + def test_unknown_model_returns_zero_cost(self): + """Unknown/custom models should NOT have fabricated costs.""" pricing = _get_pricing("totally-unknown-model-xyz") assert pricing == _DEFAULT_PRICING + assert pricing["input"] == 0.0 + assert pricing["output"] == 0.0 + + def test_custom_endpoint_model_zero_cost(self): + """Self-hosted models should return zero cost.""" + for model in ["FP16_Hermes_4.5", "Hermes_4.5_1T_epoch2", "my-local-llama"]: + pricing = _get_pricing(model) + assert pricing["input"] == 0.0, f"{model} should have zero cost" + assert pricing["output"] == 0.0, f"{model} should have zero cost" def test_none_model(self): pricing = _get_pricing(None) @@ -166,6 +177,24 @@ class TestPricing: assert pricing["input"] == 0.15 +class TestHasKnownPricing: + def test_known_commercial_model(self): + assert _has_known_pricing("gpt-4o") is True + assert _has_known_pricing("anthropic/claude-sonnet-4-20250514") is True + assert _has_known_pricing("deepseek-chat") is True + + def test_unknown_custom_model(self): + assert _has_known_pricing("FP16_Hermes_4.5") is False + assert _has_known_pricing("my-custom-model") is False + assert _has_known_pricing("") is False + assert _has_known_pricing(None) is False + + def test_heuristic_matched_models(self): + """Models matched by keyword heuristics should be considered known.""" + assert _has_known_pricing("some-opus-model") is True + assert _has_known_pricing("future-sonnet-v2") is True + + class TestEstimateCost: def test_basic_cost(self): # gpt-4o: 2.50/M input, 10.00/M output @@ -448,6 +477,19 @@ class TestTerminalFormatting: assert "█" in text # Bar chart characters + def test_terminal_format_shows_na_for_custom_models(self, db): + """Custom models should show N/A instead of fake cost.""" + db.create_session(session_id="s1", source="cli", model="my-custom-model") + db.update_token_counts("s1", input_tokens=1000, output_tokens=500) + db._conn.commit() + + engine = InsightsEngine(db) + report = engine.generate(days=30) + text = engine.format_terminal(report) + + assert "N/A" in text + assert "custom/self-hosted" in text + class TestGatewayFormatting: def test_gateway_format_is_shorter(self, populated_db): @@ -525,6 +567,49 @@ class TestEdgeCases: models = report["models"] assert len(models) == 1 assert models[0]["model"] == "unknown" + assert models[0]["has_pricing"] is False + + def test_custom_model_shows_zero_cost(self, db): + """Custom/self-hosted models should show $0 cost, not fake estimates.""" + db.create_session(session_id="s1", source="cli", model="FP16_Hermes_4.5") + db.update_token_counts("s1", input_tokens=100000, output_tokens=50000) + db._conn.commit() + + engine = InsightsEngine(db) + report = engine.generate(days=30) + assert report["overview"]["estimated_cost"] == 0.0 + assert "FP16_Hermes_4.5" in report["overview"]["models_without_pricing"] + + models = report["models"] + custom = next(m for m in models if m["model"] == "FP16_Hermes_4.5") + assert custom["cost"] == 0.0 + assert custom["has_pricing"] is False + + def test_mixed_commercial_and_custom_models(self, db): + """Mix of commercial and custom models: only commercial ones get costs.""" + db.create_session(session_id="s1", source="cli", model="gpt-4o") + db.update_token_counts("s1", input_tokens=10000, output_tokens=5000) + db.create_session(session_id="s2", source="cli", model="my-local-llama") + db.update_token_counts("s2", input_tokens=10000, output_tokens=5000) + db._conn.commit() + + engine = InsightsEngine(db) + report = engine.generate(days=30) + + # Cost should only come from gpt-4o, not from the custom model + overview = report["overview"] + assert overview["estimated_cost"] > 0 + assert "gpt-4o" in overview["models_with_pricing"] + assert "my-local-llama" in overview["models_without_pricing"] + + # Verify individual model entries + gpt = next(m for m in report["models"] if m["model"] == "gpt-4o") + assert gpt["has_pricing"] is True + assert gpt["cost"] > 0 + + llama = next(m for m in report["models"] if m["model"] == "my-local-llama") + assert llama["has_pricing"] is False + assert llama["cost"] == 0.0 def test_single_session_streak(self, db): """Single session should have streak of 0 or 1."""