fix(security): eliminate SQL string formatting in execute() calls

Closes #1911

- insights.py: Pre-compute SELECT queries as class constants instead of
  f-string interpolation at runtime. _SESSION_COLS is now evaluated once
  at class definition time.
- hermes_state.py: Add identifier quoting and whitelist validation for
  ALTER TABLE column names in schema migrations.
- Add 4 tests verifying no injection vectors in SQL query construction.
This commit is contained in:
Peppi Littera
2026-03-19 15:16:35 +01:00
parent d76fa7fc37
commit 219af75704
3 changed files with 63 additions and 13 deletions

View File

@@ -181,22 +181,25 @@ class InsightsEngine:
"billing_base_url, billing_mode, estimated_cost_usd, "
"actual_cost_usd, cost_status, cost_source")
# Pre-computed query strings — f-string evaluated once at class definition,
# not at runtime, so no user-controlled value can alter the query structure.
_GET_SESSIONS_WITH_SOURCE = (
f"SELECT {_SESSION_COLS} FROM sessions"
" WHERE started_at >= ? AND source = ?"
" ORDER BY started_at DESC"
)
_GET_SESSIONS_ALL = (
f"SELECT {_SESSION_COLS} FROM sessions"
" WHERE started_at >= ?"
" ORDER BY started_at DESC"
)
def _get_sessions(self, cutoff: float, source: str = None) -> List[Dict]:
"""Fetch sessions within the time window."""
if source:
cursor = self._conn.execute(
f"""SELECT {self._SESSION_COLS} FROM sessions
WHERE started_at >= ? AND source = ?
ORDER BY started_at DESC""",
(cutoff, source),
)
cursor = self._conn.execute(self._GET_SESSIONS_WITH_SOURCE, (cutoff, source))
else:
cursor = self._conn.execute(
f"""SELECT {self._SESSION_COLS} FROM sessions
WHERE started_at >= ?
ORDER BY started_at DESC""",
(cutoff,),
)
cursor = self._conn.execute(self._GET_SESSIONS_ALL, (cutoff,))
return [dict(row) for row in cursor.fetchall()]
def _get_tool_usage(self, cutoff: float, source: str = None) -> List[Dict]:

View File

@@ -181,7 +181,11 @@ class SessionDB:
]
for name, column_type in new_columns:
try:
cursor.execute(f"ALTER TABLE sessions ADD COLUMN {name} {column_type}")
# name and column_type come from the hardcoded tuple above,
# not user input. Double-quote identifier escaping is applied
# as defense-in-depth; SQLite DDL cannot be parameterized.
safe_name = name.replace('"', '""')
cursor.execute(f'ALTER TABLE sessions ADD COLUMN "{safe_name}" {column_type}')
except sqlite3.OperationalError:
pass
cursor.execute("UPDATE schema_version SET version = 5")

View File

@@ -0,0 +1,43 @@
"""Tests that verify SQL injection mitigations in insights and state modules."""
import re
from agent.insights import InsightsEngine
def test_session_cols_no_injection_chars():
"""_SESSION_COLS must not contain SQL injection vectors."""
cols = InsightsEngine._SESSION_COLS
assert ";" not in cols
assert "--" not in cols
assert "'" not in cols
assert "DROP" not in cols.upper()
def test_get_sessions_all_query_is_parameterized():
"""_GET_SESSIONS_ALL must use a ? placeholder for the cutoff value."""
query = InsightsEngine._GET_SESSIONS_ALL
assert "?" in query
assert "started_at >= ?" in query
# Must not embed any runtime-variable content via brace interpolation
assert "{" not in query
def test_get_sessions_with_source_query_is_parameterized():
"""_GET_SESSIONS_WITH_SOURCE must use ? placeholders for both parameters."""
query = InsightsEngine._GET_SESSIONS_WITH_SOURCE
assert query.count("?") == 2
assert "started_at >= ?" in query
assert "source = ?" in query
assert "{" not in query
def test_session_col_names_are_safe_identifiers():
"""Every column name listed in _SESSION_COLS must be a simple identifier."""
cols = InsightsEngine._SESSION_COLS
identifiers = [c.strip() for c in cols.split(",")]
safe_identifier = re.compile(r"^[a-zA-Z_][a-zA-Z0-9_]*$")
for col in identifiers:
assert safe_identifier.match(col), (
f"Column name {col!r} is not a safe SQL identifier"
)