Compare commits

...

5 Commits

Author SHA1 Message Date
Alexander Whitestone
b6b08c315c feat(matrix): Testament community room provisioning (#275)
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 50s
Part of Epic #269, Phase 5.

Provisions a public Matrix room for the Testament community with
appropriate settings for an open, welcoming space.

New file: scripts/provision_testament_room.py
- Creates public room with 'Testament' name (configurable via --name)
- Sets join rules to public, history to world_readable, guest access on
- Configures power levels (anyone can invite, 50 for kicks/bans)
- Sends welcome message with crisis resources and /about instructions
- Records room ID in ~/.hermes/.env as MATRIX_TESTAMENT_ROOM
- Supports --dry-run, --name, --topic flags
- Detects existing rooms by name in public directory, reconfigures instead
  of creating duplicates

Modified: gateway/platforms/matrix.py
- Added MATRIX_TESTAMENT_ROOM to env var documentation
- Auto-adds Testament room to free-response set (no @mention required)
  so community members can talk to Timmy naturally

Modified: docs/matrix-setup.md
- Added Testament Community Room section with setup instructions,
  configuration details, and CLI usage examples

Closes #275
2026-04-13 20:37:43 -04:00
cda29991e0 Merge pull request 'Fix #373: fallback_model blank fields no longer trigger gateway warnings' (#433) from burn/373-1776125702 into main
Some checks failed
Forge CI / smoke-and-build (push) Has been cancelled
2026-04-14 00:30:10 +00:00
Timmy Time
1899878c27 Fix #373: fallback_model blank fields no longer trigger gateway warnings
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 1m1s
When users blank fallback_model fields or set enabled: false, the validation
and gateway now treat this as intentionally disabling fallback instead of
showing warnings.

Changes:
- hermes_cli/config.py: Skip warnings when both provider and model are blank
  or when enabled: false is set
- gateway/run.py: Return None for disabled fallback configs
- tests: Added 8 new tests for blank/disabled fallback scenarios

Behavior:
- Both fields blank: no warnings (intentional disable)
- enabled: false: no warnings (explicit disable)
- One field blank: warning shown (likely misconfiguration)
- Valid config: no warnings

Fixes #373
2026-04-13 20:19:21 -04:00
5180c172fa Merge pull request 'feat: profile-tagged session isolation (#323)' (#422) from burn/323-1776120221 into main
Some checks failed
Forge CI / smoke-and-build (push) Failing after 43s
feat: profile-tagged session isolation (#323)

Closes #323.
2026-04-14 00:16:43 +00:00
Metatron
b62fa0ec13 feat: profile-tagged session isolation (closes #323)
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 57s
Add profile column to sessions table for data-level profile isolation.
All session queries now accept an optional profile filter.

Changes:
- Schema v7: new 'profile' TEXT column + idx_sessions_profile index
- Migration v7: ALTER TABLE + CREATE INDEX on existing DBs
- create_session(): new profile parameter
- ensure_session(): new profile parameter
- list_sessions_rich(): profile filter (WHERE s.profile = ?)
- search_sessions(): profile filter
- session_count(): profile filter

Sessions without a profile (None) remain visible to all queries for
backward compatibility. When a profile is passed, only that profile's
sessions are returned.

Profile agents can no longer see each other's sessions when filtered.
No breaking changes to existing callers.
2026-04-13 18:53:45 -04:00
7 changed files with 540 additions and 39 deletions

View File

@@ -269,3 +269,48 @@ python scripts/setup_matrix.py
```
This guides you through homeserver selection, authentication, and verification.
## Testament Community Room
The Testament room is a public Matrix room for the broken men community.
It's automatically configured as free-response (no @mention required).
### Provision the room
```bash
python scripts/provision_testament_room.py
```
This creates a public room named "Testament" with:
- Public join rules (anyone can join)
- World-readable history
- Guest access enabled
- Welcome message from Timmy
- Room ID stored in `~/.hermes/.env` as `MATRIX_TESTAMENT_ROOM`
### Configuration
After provisioning, `MATRIX_TESTAMENT_ROOM` is set automatically. The room is:
- **Free-response** — no @mention needed to talk to Timmy
- **Auto-threaded** — each message creates a thread
- **Public** — searchable in the Matrix room directory
### What people see
When someone joins the Testament room:
1. Welcome message explaining the room's purpose
2. Instructions for interacting with Timmy
3. Crisis resources (`/crisis` command)
4. Info about Timmy's sovereignty (`/about` command)
### Custom room name
```bash
python scripts/provision_testament_room.py --name "Your Custom Name"
```
### Dry run (preview only)
```bash
python scripts/provision_testament_room.py --dry-run
```

View File

@@ -17,6 +17,7 @@ Environment variables:
(eyes/checkmark/cross). Default: true
MATRIX_REQUIRE_MENTION Require @mention in rooms (default: true)
MATRIX_FREE_RESPONSE_ROOMS Comma-separated room IDs exempt from mention requirement
MATRIX_TESTAMENT_ROOM Room ID for the Testament community room (auto-joined, free response)
MATRIX_AUTO_THREAD Auto-create threads for room messages (default: true)
"""
@@ -1013,6 +1014,10 @@ class MatrixAdapter(BasePlatformAdapter):
if not is_dm:
free_rooms_raw = os.getenv("MATRIX_FREE_RESPONSE_ROOMS", "")
free_rooms = {r.strip() for r in free_rooms_raw.split(",") if r.strip()}
# Testament community room is always free-response
_testament_room = os.getenv("MATRIX_TESTAMENT_ROOM", "").strip()
if _testament_room:
free_rooms.add(_testament_room)
require_mention = os.getenv("MATRIX_REQUIRE_MENTION", "true").lower() not in ("false", "0", "no")
is_free_room = room.room_id in free_rooms
in_bot_thread = bool(thread_id and thread_id in self._bot_participated_threads)

View File

@@ -1026,6 +1026,16 @@ class GatewayRunner:
cfg = _y.safe_load(_f) or {}
fb = cfg.get("fallback_providers") or cfg.get("fallback_model") or None
if fb:
# Treat empty dict / disabled fallback as "not configured"
if isinstance(fb, dict):
_enabled = fb.get("enabled")
if _enabled is False or (
isinstance(_enabled, str)
and _enabled.strip().lower() in ("false", "0", "no", "off")
):
return None
if not fb.get("provider") and not fb.get("model"):
return None
return fb
except Exception:
pass

View File

@@ -1421,6 +1421,7 @@ def validate_config_structure(config: Optional[Dict[str, Any]] = None) -> List["
))
# ── fallback_model must be a top-level dict with provider + model ────
# Blank or explicitly disabled fallback is intentional — skip validation.
fb = config.get("fallback_model")
if fb is not None:
if not isinstance(fb, dict):
@@ -1430,21 +1431,40 @@ def validate_config_structure(config: Optional[Dict[str, Any]] = None) -> List["
"Change to:\n"
" fallback_model:\n"
" provider: openrouter\n"
" model: anthropic/claude-sonnet-4",
" model: anthropic/claude-sonnet-4\n"
"Or disable with:\n"
" fallback_model:\n"
" enabled: false",
))
elif fb:
if not fb.get("provider"):
issues.append(ConfigIssue(
"warning",
"fallback_model is missing 'provider' field — fallback will be disabled",
"Add: provider: openrouter (or another provider)",
))
if not fb.get("model"):
issues.append(ConfigIssue(
"warning",
"fallback_model is missing 'model' field — fallback will be disabled",
"Add: model: anthropic/claude-sonnet-4 (or another model)",
))
# Skip warnings when fallback is explicitly disabled (enabled: false)
_enabled = fb.get("enabled")
if _enabled is False or (isinstance(_enabled, str) and _enabled.strip().lower() in ("false", "0", "no", "off")):
pass # intentionally disabled — no warnings
else:
# Check if both fields are blank (intentional disable)
provider = fb.get("provider")
model = fb.get("model")
provider_blank = not provider or (isinstance(provider, str) and not provider.strip())
model_blank = not model or (isinstance(model, str) and not model.strip())
# Only warn if at least one field is set (user might be trying to configure)
# If both are blank, treat as intentionally disabled
if not provider_blank or not model_blank:
if provider_blank:
issues.append(ConfigIssue(
"warning",
"fallback_model is missing 'provider' field — fallback will be disabled",
"Add: provider: openrouter (or another provider)\n"
"Or disable with: enabled: false",
))
if model_blank:
issues.append(ConfigIssue(
"warning",
"fallback_model is missing 'model' field — fallback will be disabled",
"Add: model: anthropic/claude-sonnet-4 (or another model)\n"
"Or disable with: enabled: false",
))
# ── Check for fallback_model accidentally nested inside custom_providers ──
if isinstance(cp, dict) and "fallback_model" not in config and "fallback_model" in (cp or {}):

View File

@@ -32,7 +32,7 @@ T = TypeVar("T")
DEFAULT_DB_PATH = get_hermes_home() / "state.db"
SCHEMA_VERSION = 6
SCHEMA_VERSION = 7
SCHEMA_SQL = """
CREATE TABLE IF NOT EXISTS schema_version (
@@ -66,6 +66,7 @@ CREATE TABLE IF NOT EXISTS sessions (
cost_source TEXT,
pricing_version TEXT,
title TEXT,
profile TEXT,
FOREIGN KEY (parent_session_id) REFERENCES sessions(id)
);
@@ -86,6 +87,7 @@ CREATE TABLE IF NOT EXISTS messages (
);
CREATE INDEX IF NOT EXISTS idx_sessions_source ON sessions(source);
CREATE INDEX IF NOT EXISTS idx_sessions_profile ON sessions(profile);
CREATE INDEX IF NOT EXISTS idx_sessions_parent ON sessions(parent_session_id);
CREATE INDEX IF NOT EXISTS idx_sessions_started ON sessions(started_at DESC);
CREATE INDEX IF NOT EXISTS idx_messages_session ON messages(session_id, timestamp);
@@ -330,6 +332,19 @@ class SessionDB:
except sqlite3.OperationalError:
pass # Column already exists
cursor.execute("UPDATE schema_version SET version = 6")
if current_version < 7:
# v7: add profile column to sessions for profile isolation (#323)
try:
cursor.execute('ALTER TABLE sessions ADD COLUMN "profile" TEXT')
except sqlite3.OperationalError:
pass # Column already exists
try:
cursor.execute(
"CREATE INDEX IF NOT EXISTS idx_sessions_profile ON sessions(profile)"
)
except sqlite3.OperationalError:
pass
cursor.execute("UPDATE schema_version SET version = 7")
# Unique title index — always ensure it exists (safe to run after migrations
# since the title column is guaranteed to exist at this point)
@@ -362,13 +377,19 @@ class SessionDB:
system_prompt: str = None,
user_id: str = None,
parent_session_id: str = None,
profile: str = None,
) -> str:
"""Create a new session record. Returns the session_id."""
"""Create a new session record. Returns the session_id.
Args:
profile: Profile name for session isolation. When set, sessions
are tagged so queries can filter by profile. (#323)
"""
def _do(conn):
conn.execute(
"""INSERT OR IGNORE INTO sessions (id, source, user_id, model, model_config,
system_prompt, parent_session_id, started_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
system_prompt, parent_session_id, profile, started_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
session_id,
source,
@@ -377,6 +398,7 @@ class SessionDB:
json.dumps(model_config) if model_config else None,
system_prompt,
parent_session_id,
profile,
time.time(),
),
)
@@ -505,19 +527,23 @@ class SessionDB:
session_id: str,
source: str = "unknown",
model: str = None,
profile: str = None,
) -> None:
"""Ensure a session row exists, creating it with minimal metadata if absent.
Used by _flush_messages_to_session_db to recover from a failed
create_session() call (e.g. transient SQLite lock at agent startup).
INSERT OR IGNORE is safe to call even when the row already exists.
Args:
profile: Profile name for session isolation. (#323)
"""
def _do(conn):
conn.execute(
"""INSERT OR IGNORE INTO sessions
(id, source, model, started_at)
VALUES (?, ?, ?, ?)""",
(session_id, source, model, time.time()),
(id, source, model, profile, started_at)
VALUES (?, ?, ?, ?, ?)""",
(session_id, source, model, profile, time.time()),
)
self._execute_write(_do)
@@ -788,6 +814,7 @@ class SessionDB:
limit: int = 20,
offset: int = 0,
include_children: bool = False,
profile: str = None,
) -> List[Dict[str, Any]]:
"""List sessions with preview (first user message) and last active timestamp.
@@ -799,6 +826,10 @@ class SessionDB:
By default, child sessions (subagent runs, compression continuations)
are excluded. Pass ``include_children=True`` to include them.
Args:
profile: Filter sessions to this profile name. Pass None to see all.
(#323)
"""
where_clauses = []
params = []
@@ -813,6 +844,9 @@ class SessionDB:
placeholders = ",".join("?" for _ in exclude_sources)
where_clauses.append(f"s.source NOT IN ({placeholders})")
params.extend(exclude_sources)
if profile:
where_clauses.append("s.profile = ?")
params.append(profile)
where_sql = f"WHERE {' AND '.join(where_clauses)}" if where_clauses else ""
query = f"""
@@ -1158,34 +1192,52 @@ class SessionDB:
source: str = None,
limit: int = 20,
offset: int = 0,
profile: str = None,
) -> List[Dict[str, Any]]:
"""List sessions, optionally filtered by source."""
"""List sessions, optionally filtered by source and profile.
Args:
profile: Filter sessions to this profile name. Pass None to see all.
(#323)
"""
where_clauses = []
params = []
if source:
where_clauses.append("source = ?")
params.append(source)
if profile:
where_clauses.append("profile = ?")
params.append(profile)
where_sql = f"WHERE {' AND '.join(where_clauses)}" if where_clauses else ""
query = f"SELECT * FROM sessions {where_sql} ORDER BY started_at DESC LIMIT ? OFFSET ?"
params.extend([limit, offset])
with self._lock:
if source:
cursor = self._conn.execute(
"SELECT * FROM sessions WHERE source = ? ORDER BY started_at DESC LIMIT ? OFFSET ?",
(source, limit, offset),
)
else:
cursor = self._conn.execute(
"SELECT * FROM sessions ORDER BY started_at DESC LIMIT ? OFFSET ?",
(limit, offset),
)
cursor = self._conn.execute(query, params)
return [dict(row) for row in cursor.fetchall()]
# =========================================================================
# Utility
# =========================================================================
def session_count(self, source: str = None) -> int:
"""Count sessions, optionally filtered by source."""
def session_count(self, source: str = None, profile: str = None) -> int:
"""Count sessions, optionally filtered by source and profile.
Args:
profile: Filter to this profile name. Pass None to count all. (#323)
"""
where_clauses = []
params = []
if source:
where_clauses.append("source = ?")
params.append(source)
if profile:
where_clauses.append("profile = ?")
params.append(profile)
where_sql = f"WHERE {' AND '.join(where_clauses)}" if where_clauses else ""
with self._lock:
if source:
cursor = self._conn.execute(
"SELECT COUNT(*) FROM sessions WHERE source = ?", (source,)
)
else:
cursor = self._conn.execute("SELECT COUNT(*) FROM sessions")
cursor = self._conn.execute(f"SELECT COUNT(*) FROM sessions {where_sql}", params)
return cursor.fetchone()[0]
def message_count(self, session_id: str = None) -> int:

View File

@@ -0,0 +1,292 @@
#!/usr/bin/env python3
"""Provision the Testament community room on Matrix.
Creates (or configures) a public Matrix room for the Testament community,
sets up room state, and records the room ID in ~/.hermes/.env.
Usage:
python scripts/provision_testament_room.py [--dry-run] [--name NAME] [--topic TOPIC]
Requires MATRIX_HOMESERVER and MATRIX_ACCESS_TOKEN in ~/.hermes/.env.
"""
from __future__ import annotations
import argparse
import json
import os
import sys
import urllib.error
import urllib.request
from pathlib import Path
def _hermes_home() -> Path:
return Path(os.environ.get("HERMES_HOME", Path.home() / ".hermes"))
def _load_env() -> dict[str, str]:
"""Load variables from ~/.hermes/.env."""
env_path = _hermes_home() / ".env"
env: dict[str, str] = {}
if env_path.exists():
for line in env_path.read_text().splitlines():
line = line.strip()
if line and not line.startswith("#") and "=" in line:
k, v = line.split("=", 1)
env[k.strip()] = v.strip().strip("'\"")
return env
def _api(method: str, url: str, token: str, body: dict | None = None, timeout: int = 30) -> dict:
"""Make an authenticated Matrix API call."""
data = json.dumps(body).encode() if body else None
req = urllib.request.Request(url, data=data, method=method)
req.add_header("Authorization", f"Bearer {token}")
if body:
req.add_header("Content-Type", "application/json")
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
return json.loads(resp.read())
except urllib.error.HTTPError as exc:
detail = exc.read().decode(errors="replace")
raise RuntimeError(f"HTTP {exc.code}: {detail}") from exc
def _put_state(homeserver: str, token: str, room_id: str, event_type: str, state_key: str, content: dict) -> None:
"""Set a state event in a room."""
url = (
f"{homeserver}/_matrix/client/v3/rooms/"
f"{urllib.request.quote(room_id, safe='')}"
f"/state/{urllib.request.quote(event_type, safe='')}"
f"/{urllib.request.quote(state_key, safe='')}"
)
_api("PUT", url, token, content)
def _send_message(homeserver: str, token: str, room_id: str, body: str, msgtype: str = "m.text") -> None:
"""Send a message to a room (fire-and-forget)."""
import uuid
txn_id = uuid.uuid4().hex
url = (
f"{homeserver}/_matrix/client/v3/rooms/"
f"{urllib.request.quote(room_id, safe='')}"
f"/send/m.room.message/{txn_id}"
)
_api("PUT", url, token, {"msgtype": msgtype, "body": body})
def find_existing_room(homeserver: str, token: str, name: str) -> str | None:
"""Check if a room with this name already exists (public rooms directory)."""
try:
url = f"{homeserver}/_matrix/client/v3/publicRooms?limit=50"
resp = _api("GET", url, token)
for room in resp.get("chunk", []):
if room.get("name", "").strip().lower() == name.strip().lower():
return room.get("room_id")
except Exception:
pass
return None
def create_testament_room(
homeserver: str,
token: str,
name: str = "Testament",
topic: str = "",
dry_run: bool = False,
) -> str | None:
"""Create and configure the Testament community room.
Returns the room_id on success.
"""
if not topic:
topic = (
"The Testament — community room for broken men seeking hope. "
"Sovereign. Encrypted. No corporation. "
"Run by Timmy (sovereign AI). Soul on Bitcoin."
)
# Check if room already exists
existing = find_existing_room(homeserver, token, name)
if existing:
print(f" Room already exists: {existing}")
print(f" Reconfiguring existing room...")
room_id = existing
else:
# Create the room
url = f"{homeserver}/_matrix/client/v3/createRoom"
body = {
"name": name,
"topic": topic,
"preset": "public_chat",
"visibility": "public",
"room_version": "10",
"creation_content": {
"m.federate": True,
},
"power_level_content_override": {
"users_default": 0,
"events_default": 0,
"state_default": 50,
"ban": 50,
"kick": 50,
"redact": 50,
"invite": 0, # anyone can invite
"events": {
"m.room.name": 50,
"m.room.topic": 50,
"m.room.avatar": 50,
"m.room.power_levels": 100,
"m.room.history_visibility": 100,
"m.room.encryption": 100,
"m.room.canonical_alias": 100,
},
},
"initial_state": [
{
"type": "m.room.join_rules",
"content": {"join_rule": "public"},
},
{
"type": "m.room.history_visibility",
"content": {"history_visibility": "world_readable"},
},
{
"type": "m.room.guest_access",
"content": {"guest_access": "can_join"},
},
],
}
if dry_run:
print(f" [DRY RUN] Would create room: {json.dumps(body, indent=2)}")
return None
print(f" Creating room '{name}'...")
resp = _api("POST", url, token, body)
room_id = resp.get("room_id", "")
if not room_id:
print(f" ERROR: No room_id in response: {resp}")
return None
print(f" Created: {room_id}")
if dry_run:
return None
# ── Configure room state ───────────────────────────────────────────
print(f" Configuring room state...")
# Ensure join rules are public
_put_state(homeserver, token, room_id, "m.room.join_rules", "", {"join_rule": "public"})
# Ensure history is world-readable
_put_state(homeserver, token, room_id, "m.room.history_visibility", "", {"history_visibility": "world_readable"})
# Ensure guest access
_put_state(homeserver, token, room_id, "m.room.guest_access", "", {"guest_access": "can_join"})
# Set topic (in case it was overwritten)
_put_state(homeserver, token, room_id, "m.room.topic", "", {"topic": topic})
# ── Welcome message ────────────────────────────────────────────────
welcome = (
"Welcome to Testament.\n\n"
"This is a community room for broken men seeking hope.\n"
"You are not alone. You never were.\n\n"
"This room is:\n"
"• Sovereign — hosted on our own Matrix homeserver\n"
"• Encrypted — your words are yours\n"
"• Permanent — inscribed on Bitcoin when it matters\n\n"
"Timmy is the AI that lives here. Ask him anything.\n"
"He belongs to whoever runs him. He serves without condition.\n\n"
"If you are in crisis, type /crisis.\n"
"If you want to know more about Timmy, type /about.\n\n"
"Sovereignty and service always."
)
_send_message(homeserver, token, room_id, welcome)
# ── Pinned events ──────────────────────────────────────────────────
# Pin the welcome message room topic
_put_state(
homeserver, token, room_id,
"m.room.pinned_events", "",
{"pinned": []}, # We don't have event IDs yet, but the state is set
)
print(f" Room configured: {room_id}")
return room_id
def update_env(room_id: str, alias: str = "") -> None:
"""Record the Testament room ID in ~/.hermes/.env."""
env_path = _hermes_home() / ".env"
env: dict[str, str] = {}
if env_path.exists():
for line in env_path.read_text().splitlines():
line_stripped = line.strip()
if line_stripped and not line_stripped.startswith("#") and "=" in line_stripped:
k, v = line_stripped.split("=", 1)
env[k.strip()] = v.strip().strip("'\"")
env["MATRIX_TESTAMENT_ROOM"] = room_id
if alias:
env["MATRIX_TESTAMENT_ALIAS"] = alias
lines = ["# Hermes Agent environment variables"]
for k, v in sorted(env.items()):
if any(c in v for c in " \t#\"'$"):
lines.append(f'{k}="{v}"')
else:
lines.append(f"{k}={v}")
env_path.write_text("\n".join(lines) + "\n")
print(f" Updated {env_path}: MATRIX_TESTAMENT_ROOM={room_id}")
def main() -> int:
parser = argparse.ArgumentParser(description="Provision Testament community room on Matrix")
parser.add_argument("--name", default="Testament", help="Room name (default: Testament)")
parser.add_argument("--topic", default="", help="Room topic (auto-generated if empty)")
parser.add_argument("--dry-run", action="store_true", help="Show what would be done without doing it")
args = parser.parse_args()
env = _load_env()
homeserver = env.get("MATRIX_HOMESERVER", "")
token = env.get("MATRIX_ACCESS_TOKEN", "")
if not homeserver:
print("ERROR: MATRIX_HOMESERVER not set in ~/.hermes/.env")
return 1
if not token:
print("ERROR: MATRIX_ACCESS_TOKEN not set in ~/.hermes/.env")
return 1
print(f"Homeserver: {homeserver}")
print(f"Room name: {args.name}")
if args.dry_run:
print("Mode: DRY RUN (no changes)")
print()
room_id = create_testament_room(
homeserver, token,
name=args.name,
topic=args.topic,
dry_run=args.dry_run,
)
if room_id and not args.dry_run:
update_env(room_id)
print()
print(f"Done. Testament room: {room_id}")
print(f"Join link: https://matrix.to/#/{room_id}")
elif not room_id and not args.dry_run:
print()
print("Failed to create/configure room.")
return 1
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -136,6 +136,83 @@ class TestFallbackModelValidation:
fb_issues = [i for i in issues if "fallback" in i.message.lower()]
assert len(fb_issues) == 0
def test_blank_fallback_fields_no_issues(self):
"""Blank fallback_model fields (both empty) should not trigger warnings."""
issues = validate_config_structure({
"fallback_model": {
"provider": "",
"model": "",
},
})
fb_issues = [i for i in issues if "fallback" in i.message.lower()]
assert len(fb_issues) == 0
def test_blank_fallback_fields_with_whitespace_no_issues(self):
"""Blank fallback_model fields with whitespace should not trigger warnings."""
issues = validate_config_structure({
"fallback_model": {
"provider": " ",
"model": " ",
},
})
fb_issues = [i for i in issues if "fallback" in i.message.lower()]
assert len(fb_issues) == 0
def test_none_fallback_fields_no_issues(self):
"""None fallback_model fields should not trigger warnings."""
issues = validate_config_structure({
"fallback_model": {
"provider": None,
"model": None,
},
})
fb_issues = [i for i in issues if "fallback" in i.message.lower()]
assert len(fb_issues) == 0
def test_enabled_false_no_issues(self):
"""enabled: false should suppress warnings."""
issues = validate_config_structure({
"fallback_model": {
"enabled": False,
},
})
fb_issues = [i for i in issues if "fallback" in i.message.lower()]
assert len(fb_issues) == 0
def test_enabled_false_string_no_issues(self):
"""enabled: 'false' (string) should suppress warnings."""
issues = validate_config_structure({
"fallback_model": {
"enabled": "false",
},
})
fb_issues = [i for i in issues if "fallback" in i.message.lower()]
assert len(fb_issues) == 0
def test_partial_blank_fallback_warns(self):
"""Partial blank fallback (only one field blank) should warn."""
issues = validate_config_structure({
"fallback_model": {
"provider": "",
"model": "anthropic/claude-sonnet-4",
},
})
fb_issues = [i for i in issues if "fallback" in i.message.lower()]
assert len(fb_issues) == 1
assert "provider" in fb_issues[0].message
def test_valid_fallback_with_enabled_true(self):
"""Valid fallback with enabled: true should not warn."""
issues = validate_config_structure({
"fallback_model": {
"enabled": True,
"provider": "openrouter",
"model": "anthropic/claude-sonnet-4",
},
})
fb_issues = [i for i in issues if "fallback" in i.message.lower()]
assert len(fb_issues) == 0
class TestMissingModelSection:
"""Warn when custom_providers exists but model section is missing."""