1
0

refactor: centralize config & harden security (#141)

* feat: upgrade primary model from llama3.1:8b to qwen2.5:14b

- Swap OLLAMA_MODEL_PRIMARY to qwen2.5:14b for better reasoning
- llama3.1:8b-instruct becomes fallback
- Update .env default and README quick start
- Fix hardcoded model assertions in tests

qwen2.5:14b provides significantly better multi-step reasoning
and tool calling reliability while still running locally on
modest hardware. The 8B model remains as automatic fallback.

* security: centralize config, harden uploads, fix silent exceptions

- Add 9 pydantic Settings fields (skip_embeddings, disable_csrf,
  rqlite_url, brain_source, brain_db_path, csrf_cookie_secure,
  chat_api_max_body_bytes, timmy_test_mode) to centralize env-var access
- Migrate 8 os.environ.get() calls across 5 source files to use
  `from config import settings` per project convention
- Add path traversal defense-in-depth to file upload endpoint
- Add 1MB request body size limit to chat API
- Make CSRF cookie secure flag configurable via settings
- Replace 2 silent `except: pass` blocks with debug logging in session.py
- Remove unused `import os` from brain/memory.py and csrf.py
- Update 5 CSRF test fixtures to patch settings instead of os.environ

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

---------

Co-authored-by: Trip T <trip@local>
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Alexander Whitestone
2026-03-07 18:49:37 -05:00
committed by GitHub
parent cdd3e1a90b
commit b615595100
14 changed files with 80 additions and 56 deletions

View File

@@ -27,7 +27,8 @@ class BrainClient:
""" """
def __init__(self, rqlite_url: Optional[str] = None, node_id: Optional[str] = None): def __init__(self, rqlite_url: Optional[str] = None, node_id: Optional[str] = None):
self.rqlite_url = rqlite_url or os.environ.get("RQLITE_URL", DEFAULT_RQLITE_URL) from config import settings
self.rqlite_url = rqlite_url or settings.rqlite_url or DEFAULT_RQLITE_URL
self.node_id = node_id or f"{socket.gethostname()}-{os.getpid()}" self.node_id = node_id or f"{socket.gethostname()}-{os.getpid()}"
self.source = self._detect_source() self.source = self._detect_source()
self._client = httpx.AsyncClient(timeout=30) self._client = httpx.AsyncClient(timeout=30)
@@ -36,7 +37,8 @@ class BrainClient:
"""Detect what component is using the brain.""" """Detect what component is using the brain."""
# Could be 'timmy', 'zeroclaw', 'worker', etc. # Could be 'timmy', 'zeroclaw', 'worker', etc.
# For now, infer from context or env # For now, infer from context or env
return os.environ.get("BRAIN_SOURCE", "default") from config import settings
return settings.brain_source
# ────────────────────────────────────────────────────────────────────────── # ──────────────────────────────────────────────────────────────────────────
# Memory Operations # Memory Operations

View File

@@ -29,7 +29,6 @@ from __future__ import annotations
import json import json
import logging import logging
import os
import sqlite3 import sqlite3
import uuid import uuid
from datetime import datetime, timezone from datetime import datetime, timezone
@@ -48,9 +47,9 @@ _SCHEMA_VERSION = 1
def _get_db_path() -> Path: def _get_db_path() -> Path:
"""Get the brain database path from env or default.""" """Get the brain database path from env or default."""
env_path = os.environ.get("BRAIN_DB_PATH") from config import settings
if env_path: if settings.brain_db_path:
return Path(env_path) return Path(settings.brain_db_path)
return _DEFAULT_DB_PATH return _DEFAULT_DB_PATH
@@ -75,7 +74,8 @@ class UnifiedMemory:
# Auto-detect: use rqlite if RQLITE_URL is set, otherwise local SQLite # Auto-detect: use rqlite if RQLITE_URL is set, otherwise local SQLite
if use_rqlite is None: if use_rqlite is None:
use_rqlite = bool(os.environ.get("RQLITE_URL")) from config import settings as _settings
use_rqlite = bool(_settings.rqlite_url)
self._use_rqlite = use_rqlite self._use_rqlite = use_rqlite
if not self._use_rqlite: if not self._use_rqlite:
@@ -106,7 +106,8 @@ class UnifiedMemory:
def _get_embedder(self): def _get_embedder(self):
"""Lazy-load the embedding model.""" """Lazy-load the embedding model."""
if self._embedder is None: if self._embedder is None:
if os.environ.get("TIMMY_SKIP_EMBEDDINGS") == "1": from config import settings as _settings
if _settings.timmy_skip_embeddings:
return None return None
try: try:
from brain.embeddings import LocalEmbedder from brain.embeddings import LocalEmbedder

View File

@@ -106,6 +106,29 @@ class Settings(BaseSettings):
# In production, security settings are strictly enforced. # In production, security settings are strictly enforced.
timmy_env: Literal["development", "production"] = "development" timmy_env: Literal["development", "production"] = "development"
# ── Test / Diagnostics ─────────────────────────────────────────────
# Skip loading heavy embedding models (for tests / low-memory envs).
timmy_skip_embeddings: bool = False
# Disable CSRF middleware entirely (for tests).
timmy_disable_csrf: bool = False
# Mark the process as running in test mode.
timmy_test_mode: bool = False
# ── Brain / rqlite ─────────────────────────────────────────────────
# URL of the local rqlite node for distributed memory.
# Empty string means rqlite is not configured.
rqlite_url: str = ""
# Source identifier for brain memory entries.
brain_source: str = "default"
# Path override for the local brain SQLite database.
brain_db_path: str = ""
# ── Security Tuning ───────────────────────────────────────────────
# Set to True in production to mark CSRF cookies as Secure (HTTPS only).
csrf_cookie_secure: bool = False
# Maximum size in bytes for chat API request bodies.
chat_api_max_body_bytes: int = 1_048_576 # 1 MB
# ── Self-Modification ────────────────────────────────────────────── # ── Self-Modification ──────────────────────────────────────────────
# Enable self-modification capabilities. When enabled, the agent can # Enable self-modification capabilities. When enabled, the agent can
# edit its own source code, run tests, and commit changes. # edit its own source code, run tests, and commit changes.

View File

@@ -162,7 +162,8 @@ async def _discord_token_watcher() -> None:
if discord_bot.state.name == "CONNECTED": if discord_bot.state.name == "CONNECTED":
return # Already running — stop watching return # Already running — stop watching
# 1. Check live environment variable # 1. Check live environment variable (intentionally uses os.environ,
# not settings, because this polls for runtime hot-reload changes)
token = os.environ.get("DISCORD_TOKEN", "") token = os.environ.get("DISCORD_TOKEN", "")
# 2. Re-read .env file for hot-reload # 2. Re-read .env file for hot-reload

View File

@@ -7,7 +7,6 @@ to protect state-changing endpoints from cross-site request attacks.
import secrets import secrets
import hmac import hmac
import hashlib import hashlib
import os
from typing import Callable, Optional from typing import Callable, Optional
from functools import wraps from functools import wraps
@@ -125,7 +124,8 @@ class CSRFMiddleware(BaseHTTPMiddleware):
For unsafe methods: Validate the CSRF token. For unsafe methods: Validate the CSRF token.
""" """
# Bypass CSRF if explicitly disabled (e.g. in tests) # Bypass CSRF if explicitly disabled (e.g. in tests)
if os.environ.get("TIMMY_DISABLE_CSRF") == "1": from config import settings
if settings.timmy_disable_csrf:
return await call_next(request) return await call_next(request)
# Get existing CSRF token from cookie # Get existing CSRF token from cookie
@@ -142,7 +142,7 @@ class CSRFMiddleware(BaseHTTPMiddleware):
key=self.cookie_name, key=self.cookie_name,
value=new_token, value=new_token,
httponly=False, # Must be readable by JavaScript httponly=False, # Must be readable by JavaScript
secure=False, # Set to True in production with HTTPS secure=settings.csrf_cookie_secure,
samesite="Lax", samesite="Lax",
max_age=86400 # 24 hours max_age=86400 # 24 hours
) )

View File

@@ -14,6 +14,7 @@ import logging
import os import os
import uuid import uuid
from datetime import datetime from datetime import datetime
from pathlib import Path
from fastapi import APIRouter, File, HTTPException, Request, UploadFile from fastapi import APIRouter, File, HTTPException, Request, UploadFile
from fastapi.responses import JSONResponse from fastapi.responses import JSONResponse
@@ -42,6 +43,11 @@ async def api_chat(request: Request):
Response: Response:
{"reply": "...", "timestamp": "HH:MM:SS"} {"reply": "...", "timestamp": "HH:MM:SS"}
""" """
# Enforce request body size limit
content_length = request.headers.get("content-length")
if content_length and int(content_length) > settings.chat_api_max_body_bytes:
return JSONResponse(status_code=413, content={"error": "Request body too large"})
try: try:
body = await request.json() body = await request.json()
except Exception: except Exception:
@@ -117,6 +123,12 @@ async def api_upload(file: UploadFile = File(...)):
stored_name = f"{suffix}-{safe_name}" stored_name = f"{suffix}-{safe_name}"
file_path = os.path.join(_UPLOAD_DIR, stored_name) file_path = os.path.join(_UPLOAD_DIR, stored_name)
# Defense-in-depth: verify resolved path stays within upload directory
resolved = Path(file_path).resolve()
upload_root = Path(_UPLOAD_DIR).resolve()
if not str(resolved).startswith(str(upload_root)):
raise HTTPException(status_code=400, detail="Invalid file name")
contents = await file.read() contents = await file.read()
if len(contents) > _MAX_UPLOAD_SIZE: if len(contents) > _MAX_UPLOAD_SIZE:
raise HTTPException(status_code=413, detail="File too large (max 50 MB)") raise HTTPException(status_code=413, detail="File too large (max 50 MB)")

View File

@@ -29,9 +29,9 @@ def _get_model():
if _model is not None: if _model is not None:
return _model return _model
import os from config import settings
# In test mode or low-memory environments, skip embedding model load # In test mode or low-memory environments, skip embedding model load
if os.environ.get("TIMMY_SKIP_EMBEDDINGS") == "1": if settings.timmy_skip_embeddings:
_has_embeddings = False _has_embeddings = False
return None return None

View File

@@ -37,8 +37,8 @@ def _get_embedding_model():
"""Lazy-load embedding model.""" """Lazy-load embedding model."""
global EMBEDDING_MODEL global EMBEDDING_MODEL
if EMBEDDING_MODEL is None: if EMBEDDING_MODEL is None:
import os from config import settings
if os.environ.get("TIMMY_SKIP_EMBEDDINGS") == "1": if settings.timmy_skip_embeddings:
EMBEDDING_MODEL = False EMBEDDING_MODEL = False
return EMBEDDING_MODEL return EMBEDDING_MODEL
try: try:

View File

@@ -96,8 +96,8 @@ def reset_session(session_id: Optional[str] = None) -> None:
try: try:
from timmy.conversation import conversation_manager from timmy.conversation import conversation_manager
conversation_manager.clear_context(sid) conversation_manager.clear_context(sid)
except Exception: except Exception as exc:
pass # Graceful degradation logger.debug("Session: context clear failed for %s: %s", sid, exc)
def _extract_facts(message: str) -> None: def _extract_facts(message: str) -> None:
@@ -114,8 +114,8 @@ def _extract_facts(message: str) -> None:
from timmy.memory_system import memory_system from timmy.memory_system import memory_system
memory_system.update_user_fact("Name", name) memory_system.update_user_fact("Name", name)
logger.info("Session: Learned user name: %s", name) logger.info("Session: Learned user name: %s", name)
except Exception: except Exception as exc:
pass logger.debug("Session: fact persist failed: %s", exc)
except Exception as exc: except Exception as exc:
logger.debug("Session: Fact extraction skipped: %s", exc) logger.debug("Session: Fact extraction skipped: %s", exc)

View File

@@ -12,14 +12,11 @@ class TestCSRFMiddleware:
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def enable_csrf(self): def enable_csrf(self):
"""Re-enable CSRF for these tests.""" """Re-enable CSRF for these tests."""
import os from config import settings
old_val = os.environ.get("TIMMY_DISABLE_CSRF") original = settings.timmy_disable_csrf
os.environ["TIMMY_DISABLE_CSRF"] = "0" settings.timmy_disable_csrf = False
yield yield
if old_val is not None: settings.timmy_disable_csrf = original
os.environ["TIMMY_DISABLE_CSRF"] = old_val
else:
del os.environ["TIMMY_DISABLE_CSRF"]
def test_csrf_token_generation(self): def test_csrf_token_generation(self):
"""CSRF token should be generated and stored in session/state.""" """CSRF token should be generated and stored in session/state."""

View File

@@ -11,14 +11,11 @@ class TestCSRFBypass:
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def enable_csrf(self): def enable_csrf(self):
"""Re-enable CSRF for these tests.""" """Re-enable CSRF for these tests."""
import os from config import settings
old_val = os.environ.get("TIMMY_DISABLE_CSRF") original = settings.timmy_disable_csrf
os.environ["TIMMY_DISABLE_CSRF"] = "0" settings.timmy_disable_csrf = False
yield yield
if old_val is not None: settings.timmy_disable_csrf = original
os.environ["TIMMY_DISABLE_CSRF"] = old_val
else:
del os.environ["TIMMY_DISABLE_CSRF"]
def test_csrf_middleware_blocks_unsafe_methods_without_token(self): def test_csrf_middleware_blocks_unsafe_methods_without_token(self):
"""POST should require CSRF token even with AJAX headers (if not explicitly allowed).""" """POST should require CSRF token even with AJAX headers (if not explicitly allowed)."""

View File

@@ -11,14 +11,11 @@ class TestCSRFBypassVulnerability:
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def enable_csrf(self): def enable_csrf(self):
"""Re-enable CSRF for these tests.""" """Re-enable CSRF for these tests."""
import os from config import settings
old_val = os.environ.get("TIMMY_DISABLE_CSRF") original = settings.timmy_disable_csrf
os.environ["TIMMY_DISABLE_CSRF"] = "0" settings.timmy_disable_csrf = False
yield yield
if old_val is not None: settings.timmy_disable_csrf = original
os.environ["TIMMY_DISABLE_CSRF"] = old_val
else:
del os.environ["TIMMY_DISABLE_CSRF"]
def test_csrf_bypass_via_traversal_to_exempt_pattern(self): def test_csrf_bypass_via_traversal_to_exempt_pattern(self):
"""Test if a non-exempt route can be accessed by traversing to an exempt pattern. """Test if a non-exempt route can be accessed by traversing to an exempt pattern.

View File

@@ -11,14 +11,11 @@ class TestCSRFTraversal:
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def enable_csrf(self): def enable_csrf(self):
"""Re-enable CSRF for these tests.""" """Re-enable CSRF for these tests."""
import os from config import settings
old_val = os.environ.get("TIMMY_DISABLE_CSRF") original = settings.timmy_disable_csrf
os.environ["TIMMY_DISABLE_CSRF"] = "0" settings.timmy_disable_csrf = False
yield yield
if old_val is not None: settings.timmy_disable_csrf = original
os.environ["TIMMY_DISABLE_CSRF"] = old_val
else:
del os.environ["TIMMY_DISABLE_CSRF"]
def test_csrf_middleware_path_traversal_bypass(self): def test_csrf_middleware_path_traversal_bypass(self):
"""Test if path traversal can bypass CSRF exempt patterns.""" """Test if path traversal can bypass CSRF exempt patterns."""

View File

@@ -22,21 +22,18 @@ def test_request_logging_middleware_is_used(client):
def test_csrf_middleware_is_used(client): def test_csrf_middleware_is_used(client):
"""Verify that CSRFMiddleware is used.""" """Verify that CSRFMiddleware is used."""
import os from config import settings
old_val = os.environ.get("TIMMY_DISABLE_CSRF") original = settings.timmy_disable_csrf
os.environ["TIMMY_DISABLE_CSRF"] = "0" settings.timmy_disable_csrf = False
try: try:
# GET request should set a csrf_token cookie if not present # GET request should set a csrf_token cookie if not present
response = client.get("/") response = client.get("/")
assert "csrf_token" in response.cookies assert "csrf_token" in response.cookies
# POST request without token should be blocked (403) # POST request without token should be blocked (403)
# Use a path that isn't likely to be exempt # Use a path that isn't likely to be exempt
response = client.post("/agents/create") response = client.post("/agents/create")
assert response.status_code == 403 assert response.status_code == 403
assert response.json()["code"] == "CSRF_INVALID" assert response.json()["code"] == "CSRF_INVALID"
finally: finally:
if old_val is not None: settings.timmy_disable_csrf = original
os.environ["TIMMY_DISABLE_CSRF"] = old_val
else:
del os.environ["TIMMY_DISABLE_CSRF"]