feat: update database schema and enhance message persistence

- Incremented schema version to 2 and added a new column `finish_reason` to the `messages` table.
- Implemented a method to flush un-logged messages to the session database, ensuring data integrity during conversation interruptions.
- Enhanced error handling to persist messages in various early-return scenarios, preventing data loss.
This commit is contained in:
teknium1
2026-02-21 00:05:39 -08:00
parent c48817f69b
commit b33ed9176f
2 changed files with 95 additions and 45 deletions

View File

@@ -24,7 +24,7 @@ from typing import Dict, Any, List, Optional
DEFAULT_DB_PATH = Path(os.getenv("HERMES_HOME", Path.home() / ".hermes")) / "state.db"
SCHEMA_VERSION = 1
SCHEMA_VERSION = 2
SCHEMA_SQL = """
CREATE TABLE IF NOT EXISTS schema_version (
@@ -58,7 +58,8 @@ CREATE TABLE IF NOT EXISTS messages (
tool_calls TEXT,
tool_name TEXT,
timestamp REAL NOT NULL,
token_count INTEGER
token_count INTEGER,
finish_reason TEXT
);
CREATE INDEX IF NOT EXISTS idx_sessions_source ON sessions(source);
@@ -118,12 +119,21 @@ class SessionDB:
cursor.executescript(SCHEMA_SQL)
# Check schema version
# Check schema version and run migrations
cursor.execute("SELECT version FROM schema_version LIMIT 1")
row = cursor.fetchone()
if row is None:
cursor.execute("INSERT INTO schema_version (version) VALUES (?)", (SCHEMA_VERSION,))
# Future migrations would go here: if row["version"] < 2: ...
else:
current_version = row["version"] if isinstance(row, sqlite3.Row) else row[0]
if current_version < 2:
# v2: add finish_reason column to messages
try:
cursor.execute("ALTER TABLE messages ADD COLUMN finish_reason TEXT")
except sqlite3.OperationalError:
pass # Column already exists
cursor.execute("UPDATE schema_version SET version = 2")
# FTS5 setup (separate because CREATE VIRTUAL TABLE can't be in executescript with IF NOT EXISTS reliably)
try:
@@ -222,6 +232,7 @@ class SessionDB:
tool_calls: Any = None,
tool_call_id: str = None,
token_count: int = None,
finish_reason: str = None,
) -> int:
"""
Append a message to a session. Returns the message row ID.
@@ -231,8 +242,8 @@ class SessionDB:
"""
cursor = self._conn.execute(
"""INSERT INTO messages (session_id, role, content, tool_call_id,
tool_calls, tool_name, timestamp, token_count)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
tool_calls, tool_name, timestamp, token_count, finish_reason)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
session_id,
role,
@@ -242,6 +253,7 @@ class SessionDB:
tool_name,
time.time(),
token_count,
finish_reason,
),
)
msg_id = cursor.lastrowid

View File

@@ -1700,6 +1700,40 @@ class AIAgent:
if self.verbose_logging:
logging.warning(f"Failed to cleanup browser for task {task_id}: {e}")
def _flush_messages_to_session_db(self, messages: List[Dict], conversation_history: List[Dict] = None):
"""Persist any un-logged messages to the SQLite session store.
Called both at the normal end of run_conversation and from every early-
return path so that tool calls, tool responses, and assistant messages
are never lost even when the conversation errors out.
"""
if not self._session_db:
return
try:
start_idx = (len(conversation_history) if conversation_history else 0) + 1
for msg in messages[start_idx:]:
role = msg.get("role", "unknown")
content = msg.get("content")
tool_calls_data = None
if hasattr(msg, "tool_calls") and msg.tool_calls:
tool_calls_data = [
{"name": tc.function.name, "arguments": tc.function.arguments}
for tc in msg.tool_calls
]
elif isinstance(msg.get("tool_calls"), list):
tool_calls_data = msg["tool_calls"]
self._session_db.append_message(
session_id=self.session_id,
role=role,
content=content,
tool_name=msg.get("tool_name"),
tool_calls=tool_calls_data,
tool_call_id=msg.get("tool_call_id"),
finish_reason=msg.get("finish_reason"),
)
except Exception:
pass
def _get_messages_up_to_last_assistant(self, messages: List[Dict]) -> List[Dict]:
"""
Get messages up to (but not including) the last assistant turn.
@@ -2601,6 +2635,7 @@ class AIAgent:
if retry_count > max_retries:
print(f"{self.log_prefix}❌ Max retries ({max_retries}) exceeded for invalid responses. Giving up.")
logging.error(f"{self.log_prefix}Invalid API response after {max_retries} retries.")
self._flush_messages_to_session_db(messages, conversation_history)
return {
"messages": messages,
"completed": False,
@@ -2619,6 +2654,7 @@ class AIAgent:
while time.time() < sleep_end:
if self._interrupt_requested:
print(f"{self.log_prefix}⚡ Interrupt detected during retry wait, aborting.")
self._flush_messages_to_session_db(messages, conversation_history)
return {
"final_response": "Operation interrupted.",
"messages": messages,
@@ -2642,6 +2678,7 @@ class AIAgent:
rolled_back_messages = self._get_messages_up_to_last_assistant(messages)
self._cleanup_task_resources(effective_task_id)
self._flush_messages_to_session_db(messages, conversation_history)
return {
"final_response": None,
@@ -2654,6 +2691,7 @@ class AIAgent:
else:
# First message was truncated - mark as failed
print(f"{self.log_prefix}❌ First response truncated - cannot recover")
self._flush_messages_to_session_db(messages, conversation_history)
return {
"final_response": None,
"messages": messages,
@@ -2708,6 +2746,7 @@ class AIAgent:
# Check for interrupt before deciding to retry
if self._interrupt_requested:
print(f"{self.log_prefix}⚡ Interrupt detected during error handling, aborting retries.")
self._flush_messages_to_session_db(messages, conversation_history)
return {
"final_response": "Operation interrupted.",
"messages": messages,
@@ -2736,6 +2775,7 @@ class AIAgent:
print(f"{self.log_prefix}❌ Non-retryable client error detected. Aborting immediately.")
print(f"{self.log_prefix} 💡 This type of error won't be fixed by retrying.")
logging.error(f"{self.log_prefix}Non-retryable client error: {api_error}")
self._flush_messages_to_session_db(messages, conversation_history)
return {
"final_response": None,
"messages": messages,
@@ -2789,6 +2829,7 @@ class AIAgent:
print(f"{self.log_prefix}❌ Context length exceeded and cannot compress further.")
print(f"{self.log_prefix} 💡 The conversation has accumulated too much content.")
logging.error(f"{self.log_prefix}Context length exceeded: {approx_tokens:,} tokens. Cannot compress further.")
self._flush_messages_to_session_db(messages, conversation_history)
return {
"messages": messages,
"completed": False,
@@ -2814,6 +2855,7 @@ class AIAgent:
while time.time() < sleep_end:
if self._interrupt_requested:
print(f"{self.log_prefix}⚡ Interrupt detected during retry wait, aborting.")
self._flush_messages_to_session_db(messages, conversation_history)
return {
"final_response": "Operation interrupted.",
"messages": messages,
@@ -2850,6 +2892,7 @@ class AIAgent:
rolled_back_messages = self._get_messages_up_to_last_assistant(messages)
self._cleanup_task_resources(effective_task_id)
self._flush_messages_to_session_db(messages, conversation_history)
return {
"final_response": None,
@@ -2896,10 +2939,11 @@ class AIAgent:
else:
print(f"{self.log_prefix}❌ Max retries (3) for invalid tool calls exceeded. Stopping as partial.")
# Return partial result - don't include the bad tool call in messages
self._invalid_tool_retries = 0 # Reset for next conversation
self._invalid_tool_retries = 0
self._flush_messages_to_session_db(messages, conversation_history)
return {
"final_response": None,
"messages": messages, # Messages up to last valid point
"messages": messages,
"api_calls": api_call_count,
"completed": False,
"partial": True,
@@ -2971,7 +3015,8 @@ class AIAgent:
assistant_msg = {
"role": "assistant",
"content": assistant_message.content or "",
"reasoning": reasoning_text, # Stored for trajectory extraction & API calls
"reasoning": reasoning_text,
"finish_reason": finish_reason,
"tool_calls": [
{
"id": tool_call.id,
@@ -3228,25 +3273,42 @@ class AIAgent:
self._empty_content_retries = 0
self._empty_content_retries += 1
content_preview = final_response[:80] + "..." if len(final_response) > 80 else final_response
# Show the reasoning/thinking content so the user can see
# what the model was thinking even though content is empty
reasoning_text = self._extract_reasoning(assistant_message)
print(f"{self.log_prefix}⚠️ Response only contains think block with no content after it")
print(f"{self.log_prefix} Content: '{content_preview}'")
if reasoning_text:
reasoning_preview = reasoning_text[:500] + "..." if len(reasoning_text) > 500 else reasoning_text
print(f"{self.log_prefix} Reasoning: {reasoning_preview}")
else:
content_preview = final_response[:80] + "..." if len(final_response) > 80 else final_response
print(f"{self.log_prefix} Content: '{content_preview}'")
if self._empty_content_retries < 3:
print(f"{self.log_prefix}🔄 Retrying API call ({self._empty_content_retries}/3)...")
# Don't add the incomplete message, just retry
continue
else:
# Max retries exceeded - roll back to last complete assistant turn
print(f"{self.log_prefix}❌ Max retries (3) for empty content exceeded. Rolling back to last complete turn.")
self._empty_content_retries = 0 # Reset for next conversation
# Max retries exceeded — keep the message history intact
# and return what we have so the session is preserved.
print(f"{self.log_prefix}❌ Max retries (3) for empty content exceeded.")
self._empty_content_retries = 0
# Append the empty assistant message so the session
# log reflects what actually happened
empty_msg = {
"role": "assistant",
"content": final_response,
"reasoning": reasoning_text,
"finish_reason": finish_reason,
}
messages.append(empty_msg)
rolled_back_messages = self._get_messages_up_to_last_assistant(messages)
self._cleanup_task_resources(effective_task_id)
self._flush_messages_to_session_db(messages, conversation_history)
return {
"final_response": None,
"messages": rolled_back_messages,
"final_response": final_response or None,
"messages": messages,
"api_calls": api_call_count,
"completed": False,
"partial": True,
@@ -3265,11 +3327,11 @@ class AIAgent:
logging.debug(f"Captured final reasoning ({len(reasoning_text)} chars): {preview}")
# Build final assistant message
# Content stays as-is; reasoning stored separately for trajectory extraction
final_msg = {
"role": "assistant",
"content": final_response,
"reasoning": reasoning_text # Stored for trajectory extraction
"reasoning": reasoning_text,
"finish_reason": finish_reason,
}
# Store reasoning_details for multi-turn reasoning context (OpenRouter)
@@ -3373,31 +3435,7 @@ class AIAgent:
self._session_messages = messages
self._save_session_log(messages)
# Log new messages to SQLite session store (everything after the user message we already logged)
if self._session_db:
try:
# Skip messages that were in the conversation history before this call
# (the user message was already logged at the start of run_conversation)
start_idx = (len(conversation_history) if conversation_history else 0) + 1 # +1 for the user msg
for msg in messages[start_idx:]:
role = msg.get("role", "unknown")
content = msg.get("content")
# Extract tool call info from assistant messages
tool_calls_data = None
if hasattr(msg, "tool_calls") and msg.tool_calls:
tool_calls_data = [{"name": tc.function.name, "arguments": tc.function.arguments} for tc in msg.tool_calls]
elif isinstance(msg.get("tool_calls"), list):
tool_calls_data = msg["tool_calls"]
self._session_db.append_message(
session_id=self.session_id,
role=role,
content=content,
tool_name=msg.get("tool_name"),
tool_calls=tool_calls_data,
tool_call_id=msg.get("tool_call_id"),
)
except Exception:
pass
self._flush_messages_to_session_db(messages, conversation_history)
# Build result with interrupt info if applicable
result = {