fix: add title validation — sanitize, length limit, control char stripping
- Add SessionDB.sanitize_title() static method: - Strips ASCII control chars (null, bell, ESC, etc.) except whitespace - Strips problematic Unicode controls (zero-width, RTL override, BOM) - Collapses whitespace runs, strips edges - Normalizes empty/whitespace-only to None - Enforces 100 char max length (raises ValueError) - set_session_title() now calls sanitize_title() internally, so all call sites (CLI, gateway, auto-lineage) are protected - CLI /title handler sanitizes early to show correct feedback - Gateway /title handler sanitizes early to show correct feedback - 24 new tests: sanitize_title (17 cases covering control chars, zero-width, RTL, BOM, emoji, CJK, length, integration), gateway validation (too long, control chars, only-control-chars)
This commit is contained in:
20
cli.py
20
cli.py
@@ -2116,12 +2116,20 @@ class HermesCLI:
|
||||
elif cmd_lower.startswith("/title"):
|
||||
parts = cmd_original.split(maxsplit=1)
|
||||
if len(parts) > 1:
|
||||
new_title = parts[1].strip()
|
||||
if new_title:
|
||||
raw_title = parts[1].strip()
|
||||
if raw_title:
|
||||
if self._session_db:
|
||||
# Check if session exists in DB yet
|
||||
session = self._session_db.get_session(self.session_id)
|
||||
if session:
|
||||
# Sanitize the title early so feedback matches what gets stored
|
||||
try:
|
||||
from hermes_state import SessionDB
|
||||
new_title = SessionDB.sanitize_title(raw_title)
|
||||
except ValueError as e:
|
||||
_cprint(f" {e}")
|
||||
new_title = None
|
||||
if not new_title:
|
||||
_cprint(" Title is empty after cleanup. Please use printable characters.")
|
||||
elif self._session_db.get_session(self.session_id):
|
||||
# Session exists in DB — set title directly
|
||||
try:
|
||||
if self._session_db.set_session_title(self.session_id, new_title):
|
||||
_cprint(f" Session title set: {new_title}")
|
||||
@@ -2131,7 +2139,7 @@ class HermesCLI:
|
||||
_cprint(f" {e}")
|
||||
else:
|
||||
# Session not created yet — defer the title
|
||||
# Check uniqueness proactively
|
||||
# Check uniqueness proactively with the sanitized title
|
||||
existing = self._session_db.get_session_by_title(new_title)
|
||||
if existing:
|
||||
_cprint(f" Title '{new_title}' is already in use by session {existing['id']}")
|
||||
|
||||
@@ -1707,10 +1707,17 @@ class GatewayRunner:
|
||||
|
||||
title_arg = event.get_command_args().strip()
|
||||
if title_arg:
|
||||
# Sanitize the title before setting
|
||||
try:
|
||||
sanitized = self._session_db.sanitize_title(title_arg)
|
||||
except ValueError as e:
|
||||
return f"⚠️ {e}"
|
||||
if not sanitized:
|
||||
return "⚠️ Title is empty after cleanup. Please use printable characters."
|
||||
# Set the title
|
||||
try:
|
||||
if self._session_db.set_session_title(session_id, title_arg):
|
||||
return f"✏️ Session title set: **{title_arg}**"
|
||||
if self._session_db.set_session_title(session_id, sanitized):
|
||||
return f"✏️ Session title set: **{sanitized}**"
|
||||
else:
|
||||
return "Session not found in database."
|
||||
except ValueError as e:
|
||||
|
||||
@@ -246,17 +246,64 @@ class SessionDB:
|
||||
row = cursor.fetchone()
|
||||
return dict(row) if row else None
|
||||
|
||||
# Maximum length for session titles
|
||||
MAX_TITLE_LENGTH = 100
|
||||
|
||||
@staticmethod
|
||||
def sanitize_title(title: Optional[str]) -> Optional[str]:
|
||||
"""Validate and sanitize a session title.
|
||||
|
||||
- Strips leading/trailing whitespace
|
||||
- Removes ASCII control characters (0x00-0x1F, 0x7F) and problematic
|
||||
Unicode control chars (zero-width, RTL/LTR overrides, etc.)
|
||||
- Collapses internal whitespace runs to single spaces
|
||||
- Normalizes empty/whitespace-only strings to None
|
||||
- Enforces MAX_TITLE_LENGTH
|
||||
|
||||
Returns the cleaned title string or None.
|
||||
Raises ValueError if the title exceeds MAX_TITLE_LENGTH after cleaning.
|
||||
"""
|
||||
if not title:
|
||||
return None
|
||||
|
||||
import re
|
||||
|
||||
# Remove ASCII control characters (0x00-0x1F, 0x7F) but keep
|
||||
# whitespace chars (\t=0x09, \n=0x0A, \r=0x0D) so they can be
|
||||
# normalized to spaces by the whitespace collapsing step below
|
||||
cleaned = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f\x7f]', '', title)
|
||||
|
||||
# Remove problematic Unicode control characters:
|
||||
# - Zero-width chars (U+200B-U+200F, U+FEFF)
|
||||
# - Directional overrides (U+202A-U+202E, U+2066-U+2069)
|
||||
# - Object replacement (U+FFFC), interlinear annotation (U+FFF9-U+FFFB)
|
||||
cleaned = re.sub(
|
||||
r'[\u200b-\u200f\u2028-\u202e\u2060-\u2069\ufeff\ufffc\ufff9-\ufffb]',
|
||||
'', cleaned,
|
||||
)
|
||||
|
||||
# Collapse internal whitespace runs and strip
|
||||
cleaned = re.sub(r'\s+', ' ', cleaned).strip()
|
||||
|
||||
if not cleaned:
|
||||
return None
|
||||
|
||||
if len(cleaned) > SessionDB.MAX_TITLE_LENGTH:
|
||||
raise ValueError(
|
||||
f"Title too long ({len(cleaned)} chars, max {SessionDB.MAX_TITLE_LENGTH})"
|
||||
)
|
||||
|
||||
return cleaned
|
||||
|
||||
def set_session_title(self, session_id: str, title: str) -> bool:
|
||||
"""Set or update a session's title.
|
||||
|
||||
Returns True if session was found and title was set.
|
||||
Raises ValueError if title is already in use by another session.
|
||||
Empty strings are normalized to None (clearing the title).
|
||||
Raises ValueError if title is already in use by another session,
|
||||
or if the title fails validation (too long, invalid characters).
|
||||
Empty/whitespace-only strings are normalized to None (clearing the title).
|
||||
"""
|
||||
# Normalize empty string to None so it doesn't conflict with the
|
||||
# unique index (only non-NULL values are constrained)
|
||||
if not title:
|
||||
title = None
|
||||
title = self.sanitize_title(title)
|
||||
if title:
|
||||
# Check uniqueness (allow the same session to keep its own title)
|
||||
cursor = self._conn.execute(
|
||||
|
||||
@@ -122,6 +122,48 @@ class TestHandleTitleCommand:
|
||||
result = await runner._handle_title_command(event)
|
||||
assert "not available" in result
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_title_too_long(self, tmp_path):
|
||||
"""Setting a title that exceeds max length returns error."""
|
||||
from hermes_state import SessionDB
|
||||
db = SessionDB(db_path=tmp_path / "state.db")
|
||||
db.create_session("test_session_123", "telegram")
|
||||
|
||||
runner = _make_runner(session_db=db)
|
||||
long_title = "A" * 150
|
||||
event = _make_event(text=f"/title {long_title}")
|
||||
result = await runner._handle_title_command(event)
|
||||
assert "too long" in result
|
||||
assert "⚠️" in result
|
||||
db.close()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_title_control_chars_sanitized(self, tmp_path):
|
||||
"""Control characters are stripped and sanitized title is stored."""
|
||||
from hermes_state import SessionDB
|
||||
db = SessionDB(db_path=tmp_path / "state.db")
|
||||
db.create_session("test_session_123", "telegram")
|
||||
|
||||
runner = _make_runner(session_db=db)
|
||||
event = _make_event(text="/title hello\x00world")
|
||||
result = await runner._handle_title_command(event)
|
||||
assert "helloworld" in result
|
||||
assert db.get_session_title("test_session_123") == "helloworld"
|
||||
db.close()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_title_only_control_chars(self, tmp_path):
|
||||
"""Title with only control chars returns empty error."""
|
||||
from hermes_state import SessionDB
|
||||
db = SessionDB(db_path=tmp_path / "state.db")
|
||||
db.create_session("test_session_123", "telegram")
|
||||
|
||||
runner = _make_runner(session_db=db)
|
||||
event = _make_event(text="/title \x00\x01\x02")
|
||||
result = await runner._handle_title_command(event)
|
||||
assert "empty after cleanup" in result
|
||||
db.close()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_works_across_platforms(self, tmp_path):
|
||||
"""The /title command works for Discord, Slack, and WhatsApp too."""
|
||||
|
||||
@@ -435,6 +435,89 @@ class TestSessionTitle:
|
||||
assert session["ended_at"] is not None
|
||||
|
||||
|
||||
class TestSanitizeTitle:
|
||||
"""Tests for SessionDB.sanitize_title() validation and cleaning."""
|
||||
|
||||
def test_normal_title_unchanged(self):
|
||||
assert SessionDB.sanitize_title("My Project") == "My Project"
|
||||
|
||||
def test_strips_whitespace(self):
|
||||
assert SessionDB.sanitize_title(" hello world ") == "hello world"
|
||||
|
||||
def test_collapses_internal_whitespace(self):
|
||||
assert SessionDB.sanitize_title("hello world") == "hello world"
|
||||
|
||||
def test_tabs_and_newlines_collapsed(self):
|
||||
assert SessionDB.sanitize_title("hello\t\nworld") == "hello world"
|
||||
|
||||
def test_none_returns_none(self):
|
||||
assert SessionDB.sanitize_title(None) is None
|
||||
|
||||
def test_empty_string_returns_none(self):
|
||||
assert SessionDB.sanitize_title("") is None
|
||||
|
||||
def test_whitespace_only_returns_none(self):
|
||||
assert SessionDB.sanitize_title(" \t\n ") is None
|
||||
|
||||
def test_control_chars_stripped(self):
|
||||
# Null byte, bell, backspace, etc.
|
||||
assert SessionDB.sanitize_title("hello\x00world") == "helloworld"
|
||||
assert SessionDB.sanitize_title("\x07\x08test\x1b") == "test"
|
||||
|
||||
def test_del_char_stripped(self):
|
||||
assert SessionDB.sanitize_title("hello\x7fworld") == "helloworld"
|
||||
|
||||
def test_zero_width_chars_stripped(self):
|
||||
# Zero-width space (U+200B), zero-width joiner (U+200D)
|
||||
assert SessionDB.sanitize_title("hello\u200bworld") == "helloworld"
|
||||
assert SessionDB.sanitize_title("hello\u200dworld") == "helloworld"
|
||||
|
||||
def test_rtl_override_stripped(self):
|
||||
# Right-to-left override (U+202E) — used in filename spoofing attacks
|
||||
assert SessionDB.sanitize_title("hello\u202eworld") == "helloworld"
|
||||
|
||||
def test_bom_stripped(self):
|
||||
# Byte order mark (U+FEFF)
|
||||
assert SessionDB.sanitize_title("\ufeffhello") == "hello"
|
||||
|
||||
def test_only_control_chars_returns_none(self):
|
||||
assert SessionDB.sanitize_title("\x00\x01\x02\u200b\ufeff") is None
|
||||
|
||||
def test_max_length_allowed(self):
|
||||
title = "A" * 100
|
||||
assert SessionDB.sanitize_title(title) == title
|
||||
|
||||
def test_exceeds_max_length_raises(self):
|
||||
title = "A" * 101
|
||||
with pytest.raises(ValueError, match="too long"):
|
||||
SessionDB.sanitize_title(title)
|
||||
|
||||
def test_unicode_emoji_allowed(self):
|
||||
assert SessionDB.sanitize_title("🚀 My Project 🎉") == "🚀 My Project 🎉"
|
||||
|
||||
def test_cjk_characters_allowed(self):
|
||||
assert SessionDB.sanitize_title("我的项目") == "我的项目"
|
||||
|
||||
def test_accented_characters_allowed(self):
|
||||
assert SessionDB.sanitize_title("Résumé éditing") == "Résumé éditing"
|
||||
|
||||
def test_special_punctuation_allowed(self):
|
||||
title = "PR #438 — fixing the 'auth' middleware"
|
||||
assert SessionDB.sanitize_title(title) == title
|
||||
|
||||
def test_sanitize_applied_in_set_session_title(self, db):
|
||||
"""set_session_title applies sanitize_title internally."""
|
||||
db.create_session("s1", "cli")
|
||||
db.set_session_title("s1", " hello\x00 world ")
|
||||
assert db.get_session("s1")["title"] == "hello world"
|
||||
|
||||
def test_too_long_title_rejected_by_set(self, db):
|
||||
"""set_session_title raises ValueError for overly long titles."""
|
||||
db.create_session("s1", "cli")
|
||||
with pytest.raises(ValueError, match="too long"):
|
||||
db.set_session_title("s1", "X" * 150)
|
||||
|
||||
|
||||
class TestSchemaInit:
|
||||
def test_wal_mode(self, db):
|
||||
cursor = db._conn.execute("PRAGMA journal_mode")
|
||||
|
||||
Reference in New Issue
Block a user