Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
9ba3bfe4fa feat(session): lazy session creation — defer SQLite write until first message
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 20s
32.4% of all sessions (3,564 of 10,985) were empty — created but never
used. The root cause: get_or_create_session() wrote to SQLite
immediately, even for webhook pings and platform handshake events that
never produced a real conversation.

Changes:
- Added _db_persisted flag to SessionEntry (transient, not serialized)
- get_or_create_session() now marks new sessions as _db_persisted=False
  instead of calling db.create_session()
- New ensure_db_session() method flushes the DB record lazily on first
  real user message
- Gateway _handle_message_with_agent() calls ensure_db_session() before
  dispatching to the agent
- reset_session() also defers DB create for consistency

Impact: Eliminates ~3,500 wasted session records per cycle. Sessions
that never receive a message never hit the DB.

Refs #314
2026-04-13 17:25:58 -04:00
3 changed files with 34 additions and 164 deletions

View File

@@ -1,154 +0,0 @@
#!/usr/bin/env python3
"""
deploy-crons — normalize cron job schemas for consistent model field types.
This script ensures that the model field in jobs.json is always a dict when
either model or provider is specified, preventing schema inconsistency.
Usage:
python deploy-crons.py [--dry-run] [--jobs-file PATH]
"""
import argparse
import json
import sys
from pathlib import Path
from typing import Any, Dict, Optional
def normalize_job(job: Dict[str, Any]) -> Dict[str, Any]:
"""
Normalize a job dict to ensure consistent model field types.
Before normalization:
- If model AND provider: model = raw string, provider = raw string (inconsistent)
- If only model: model = raw string
- If only provider: provider = raw string at top level
After normalization:
- If model exists: model = {"model": "xxx"}
- If provider exists: model = {"provider": "yyy"}
- If both exist: model = {"model": "xxx", "provider": "yyy"}
- If neither: model = None
"""
job = dict(job) # Create a copy to avoid modifying the original
model = job.get("model")
provider = job.get("provider")
# Skip if already normalized (model is a dict)
if isinstance(model, dict):
return job
# Build normalized model dict
model_dict = {}
if model is not None and isinstance(model, str):
model_dict["model"] = model.strip()
if provider is not None and isinstance(provider, str):
model_dict["provider"] = provider.strip()
# Set model field
if model_dict:
job["model"] = model_dict
else:
job["model"] = None
# Remove top-level provider field if it was moved into model dict
if provider is not None and "provider" in model_dict:
# Keep provider field for backward compatibility but mark it as deprecated
# This allows existing code that reads job["provider"] to continue working
pass
return job
def normalize_jobs_file(jobs_file: Path, dry_run: bool = False) -> int:
"""
Normalize all jobs in a jobs.json file.
Returns the number of jobs that were modified.
"""
if not jobs_file.exists():
print(f"Error: Jobs file not found: {jobs_file}", file=sys.stderr)
return 1
try:
with open(jobs_file, 'r', encoding='utf-8') as f:
data = json.load(f)
except json.JSONDecodeError as e:
print(f"Error: Invalid JSON in {jobs_file}: {e}", file=sys.stderr)
return 1
jobs = data.get("jobs", [])
if not jobs:
print("No jobs found in file.")
return 0
modified_count = 0
for i, job in enumerate(jobs):
original_model = job.get("model")
original_provider = job.get("provider")
normalized_job = normalize_job(job)
# Check if anything changed
if (normalized_job.get("model") != original_model or
normalized_job.get("provider") != original_provider):
jobs[i] = normalized_job
modified_count += 1
job_id = job.get("id", "?")
job_name = job.get("name", "(unnamed)")
print(f"Normalized job {job_id} ({job_name}):")
print(f" model: {original_model!r} -> {normalized_job.get('model')!r}")
print(f" provider: {original_provider!r} -> {normalized_job.get('provider')!r}")
if modified_count == 0:
print("All jobs already have consistent model field types.")
return 0
if dry_run:
print(f"DRY RUN: Would normalize {modified_count} jobs.")
return 0
# Write back to file
data["jobs"] = jobs
try:
with open(jobs_file, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
print(f"Normalized {modified_count} jobs in {jobs_file}")
return 0
except Exception as e:
print(f"Error writing to {jobs_file}: {e}", file=sys.stderr)
return 1
def main():
parser = argparse.ArgumentParser(
description="Normalize cron job schemas for consistent model field types."
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Show what would be changed without modifying the file."
)
parser.add_argument(
"--jobs-file",
type=Path,
default=Path.home() / ".hermes" / "cron" / "jobs.json",
help="Path to jobs.json file (default: ~/.hermes/cron/jobs.json)"
)
args = parser.parse_args()
if args.dry_run:
print("DRY RUN MODE — no changes will be made.")
print()
return normalize_jobs_file(args.jobs_file, args.dry_run)
if __name__ == "__main__":
sys.exit(main())

View File

@@ -2304,6 +2304,9 @@ class GatewayRunner:
# Get or create session
session_entry = self.session_store.get_or_create_session(source)
session_key = session_entry.session_key
# Lazy session creation: persist to SQLite on first real message
self.session_store.ensure_db_session(session_entry)
# Emit session:start for new or auto-reset sessions
_is_new_session = (

View File

@@ -383,6 +383,11 @@ class SessionEntry:
# survives gateway restarts (the old in-memory _pre_flushed_sessions
# set was lost on restart, causing redundant re-flushes).
memory_flushed: bool = False
# Lazy session creation: tracks whether the session record has been
# written to SQLite. New sessions start False; the DB write is
# deferred until the first user message arrives.
_db_persisted: bool = True
def to_dict(self) -> Dict[str, Any]:
result = {
@@ -763,11 +768,10 @@ class SessionStore:
except Exception as e:
logger.debug("Session DB operation failed: %s", e)
if self._db and db_create_kwargs:
try:
self._db.create_session(**db_create_kwargs)
except Exception as e:
print(f"[gateway] Warning: Failed to create SQLite session: {e}")
# Lazy session creation: defer DB write until first user message.
# Mark the entry as not yet persisted; ensure_db_session() will
# flush it when the gateway receives an actual message.
entry._db_persisted = False
# Seed new DM thread sessions with parent DM session history.
# When a bot reply creates a Slack thread and the user responds in it,
@@ -806,6 +810,26 @@ class SessionStore:
return entry
def ensure_db_session(self, entry: SessionEntry) -> None:
"""Lazily persist a session to SQLite on first user message.
Called by the gateway message handler when a real message arrives.
If the session is already persisted, this is a no-op.
"""
if entry._db_persisted or not self._db:
return
try:
source_val = entry.platform.value if entry.platform else "unknown"
user_id = entry.origin.user_id if entry.origin else None
self._db.create_session(
session_id=entry.session_id,
source=source_val,
user_id=user_id,
)
entry._db_persisted = True
except Exception as e:
logger.warning("Failed to lazily create SQLite session: %s", e)
def update_session(
self,
session_key: str,
@@ -865,11 +889,8 @@ class SessionStore:
except Exception as e:
logger.debug("Session DB operation failed: %s", e)
if self._db and db_create_kwargs:
try:
self._db.create_session(**db_create_kwargs)
except Exception as e:
logger.debug("Session DB operation failed: %s", e)
# Lazy: defer DB create until first message
new_entry._db_persisted = False
return new_entry