Compare commits
1 Commits
burn/349-1
...
burn/323-1
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b62fa0ec13 |
14
cli.py
14
cli.py
@@ -4123,19 +4123,7 @@ class HermesCLI:
|
||||
print(f" Skills: {', '.join(job['skills'])}")
|
||||
print(f" Prompt: {job.get('prompt_preview', '')}")
|
||||
if job.get("last_run_at"):
|
||||
status = job.get('last_status', '?')
|
||||
if status == "error" and job.get("last_error"):
|
||||
# Show error with staleness hint
|
||||
last_success = job.get("last_success_at", "")
|
||||
if last_success and last_success > job.get("last_error_at", ""):
|
||||
print(f" Last run: {job['last_run_at']} (recovered, last error was before {last_success})")
|
||||
else:
|
||||
err_preview = job['last_error'][:80]
|
||||
print(f" Last run: {job['last_run_at']} (error: {err_preview}...)")
|
||||
elif status == "retrying":
|
||||
print(f" Last run: {job['last_run_at']} (retrying...)")
|
||||
else:
|
||||
print(f" Last run: {job['last_run_at']} ({status})")
|
||||
print(f" Last run: {job['last_run_at']} ({job.get('last_status', '?')})")
|
||||
print()
|
||||
return
|
||||
|
||||
|
||||
27
cron/jobs.py
27
cron/jobs.py
@@ -528,11 +528,7 @@ def pause_job(job_id: str, reason: Optional[str] = None) -> Optional[Dict[str, A
|
||||
|
||||
|
||||
def resume_job(job_id: str) -> Optional[Dict[str, Any]]:
|
||||
"""Resume a paused job and compute the next future run from now.
|
||||
|
||||
Clears stale error state so the job's health reflects the upcoming
|
||||
run rather than a previous failure that may have caused the pause.
|
||||
"""
|
||||
"""Resume a paused job and compute the next future run from now."""
|
||||
job = get_job(job_id)
|
||||
if not job:
|
||||
return None
|
||||
@@ -546,19 +542,12 @@ def resume_job(job_id: str) -> Optional[Dict[str, Any]]:
|
||||
"paused_at": None,
|
||||
"paused_reason": None,
|
||||
"next_run_at": next_run_at,
|
||||
# Clear stale error — resuming is an explicit retry signal.
|
||||
"last_error": None,
|
||||
"last_status": "retrying" if job.get("last_status") == "error" else job.get("last_status"),
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
def trigger_job(job_id: str) -> Optional[Dict[str, Any]]:
|
||||
"""Schedule a job to run on the next scheduler tick.
|
||||
|
||||
Clears stale error state so the job's health reflects the upcoming
|
||||
re-run rather than the previous failure.
|
||||
"""
|
||||
"""Schedule a job to run on the next scheduler tick."""
|
||||
job = get_job(job_id)
|
||||
if not job:
|
||||
return None
|
||||
@@ -570,10 +559,6 @@ def trigger_job(job_id: str) -> Optional[Dict[str, Any]]:
|
||||
"paused_at": None,
|
||||
"paused_reason": None,
|
||||
"next_run_at": _hermes_now().isoformat(),
|
||||
# Clear stale error — the job is being retried.
|
||||
# The next run will set last_error on failure or None on success.
|
||||
"last_error": None,
|
||||
"last_status": "retrying" if job.get("last_status") == "error" else job.get("last_status"),
|
||||
},
|
||||
)
|
||||
|
||||
@@ -633,8 +618,6 @@ def mark_job_run(job_id: str, success: bool, error: Optional[str] = None):
|
||||
|
||||
Updates last_run_at, last_status, increments completed count,
|
||||
computes next_run_at, and auto-deletes if repeat limit reached.
|
||||
Also tracks last_error_at and last_success_at timestamps so callers
|
||||
can distinguish current health from historical failure residue.
|
||||
"""
|
||||
jobs = load_jobs()
|
||||
for i, job in enumerate(jobs):
|
||||
@@ -644,12 +627,6 @@ def mark_job_run(job_id: str, success: bool, error: Optional[str] = None):
|
||||
job["last_status"] = "ok" if success else "error"
|
||||
job["last_error"] = error if not success else None
|
||||
|
||||
# Track timestamps for health distinction
|
||||
if success:
|
||||
job["last_success_at"] = now
|
||||
else:
|
||||
job["last_error_at"] = now
|
||||
|
||||
# Increment completed count
|
||||
if job.get("repeat"):
|
||||
job["repeat"]["completed"] = job["repeat"].get("completed", 0) + 1
|
||||
|
||||
104
hermes_state.py
104
hermes_state.py
@@ -32,7 +32,7 @@ T = TypeVar("T")
|
||||
|
||||
DEFAULT_DB_PATH = get_hermes_home() / "state.db"
|
||||
|
||||
SCHEMA_VERSION = 6
|
||||
SCHEMA_VERSION = 7
|
||||
|
||||
SCHEMA_SQL = """
|
||||
CREATE TABLE IF NOT EXISTS schema_version (
|
||||
@@ -66,6 +66,7 @@ CREATE TABLE IF NOT EXISTS sessions (
|
||||
cost_source TEXT,
|
||||
pricing_version TEXT,
|
||||
title TEXT,
|
||||
profile TEXT,
|
||||
FOREIGN KEY (parent_session_id) REFERENCES sessions(id)
|
||||
);
|
||||
|
||||
@@ -86,6 +87,7 @@ CREATE TABLE IF NOT EXISTS messages (
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_sessions_source ON sessions(source);
|
||||
CREATE INDEX IF NOT EXISTS idx_sessions_profile ON sessions(profile);
|
||||
CREATE INDEX IF NOT EXISTS idx_sessions_parent ON sessions(parent_session_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_sessions_started ON sessions(started_at DESC);
|
||||
CREATE INDEX IF NOT EXISTS idx_messages_session ON messages(session_id, timestamp);
|
||||
@@ -330,6 +332,19 @@ class SessionDB:
|
||||
except sqlite3.OperationalError:
|
||||
pass # Column already exists
|
||||
cursor.execute("UPDATE schema_version SET version = 6")
|
||||
if current_version < 7:
|
||||
# v7: add profile column to sessions for profile isolation (#323)
|
||||
try:
|
||||
cursor.execute('ALTER TABLE sessions ADD COLUMN "profile" TEXT')
|
||||
except sqlite3.OperationalError:
|
||||
pass # Column already exists
|
||||
try:
|
||||
cursor.execute(
|
||||
"CREATE INDEX IF NOT EXISTS idx_sessions_profile ON sessions(profile)"
|
||||
)
|
||||
except sqlite3.OperationalError:
|
||||
pass
|
||||
cursor.execute("UPDATE schema_version SET version = 7")
|
||||
|
||||
# Unique title index — always ensure it exists (safe to run after migrations
|
||||
# since the title column is guaranteed to exist at this point)
|
||||
@@ -362,13 +377,19 @@ class SessionDB:
|
||||
system_prompt: str = None,
|
||||
user_id: str = None,
|
||||
parent_session_id: str = None,
|
||||
profile: str = None,
|
||||
) -> str:
|
||||
"""Create a new session record. Returns the session_id."""
|
||||
"""Create a new session record. Returns the session_id.
|
||||
|
||||
Args:
|
||||
profile: Profile name for session isolation. When set, sessions
|
||||
are tagged so queries can filter by profile. (#323)
|
||||
"""
|
||||
def _do(conn):
|
||||
conn.execute(
|
||||
"""INSERT OR IGNORE INTO sessions (id, source, user_id, model, model_config,
|
||||
system_prompt, parent_session_id, started_at)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
|
||||
system_prompt, parent_session_id, profile, started_at)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
|
||||
(
|
||||
session_id,
|
||||
source,
|
||||
@@ -377,6 +398,7 @@ class SessionDB:
|
||||
json.dumps(model_config) if model_config else None,
|
||||
system_prompt,
|
||||
parent_session_id,
|
||||
profile,
|
||||
time.time(),
|
||||
),
|
||||
)
|
||||
@@ -505,19 +527,23 @@ class SessionDB:
|
||||
session_id: str,
|
||||
source: str = "unknown",
|
||||
model: str = None,
|
||||
profile: str = None,
|
||||
) -> None:
|
||||
"""Ensure a session row exists, creating it with minimal metadata if absent.
|
||||
|
||||
Used by _flush_messages_to_session_db to recover from a failed
|
||||
create_session() call (e.g. transient SQLite lock at agent startup).
|
||||
INSERT OR IGNORE is safe to call even when the row already exists.
|
||||
|
||||
Args:
|
||||
profile: Profile name for session isolation. (#323)
|
||||
"""
|
||||
def _do(conn):
|
||||
conn.execute(
|
||||
"""INSERT OR IGNORE INTO sessions
|
||||
(id, source, model, started_at)
|
||||
VALUES (?, ?, ?, ?)""",
|
||||
(session_id, source, model, time.time()),
|
||||
(id, source, model, profile, started_at)
|
||||
VALUES (?, ?, ?, ?, ?)""",
|
||||
(session_id, source, model, profile, time.time()),
|
||||
)
|
||||
self._execute_write(_do)
|
||||
|
||||
@@ -788,6 +814,7 @@ class SessionDB:
|
||||
limit: int = 20,
|
||||
offset: int = 0,
|
||||
include_children: bool = False,
|
||||
profile: str = None,
|
||||
) -> List[Dict[str, Any]]:
|
||||
"""List sessions with preview (first user message) and last active timestamp.
|
||||
|
||||
@@ -799,6 +826,10 @@ class SessionDB:
|
||||
|
||||
By default, child sessions (subagent runs, compression continuations)
|
||||
are excluded. Pass ``include_children=True`` to include them.
|
||||
|
||||
Args:
|
||||
profile: Filter sessions to this profile name. Pass None to see all.
|
||||
(#323)
|
||||
"""
|
||||
where_clauses = []
|
||||
params = []
|
||||
@@ -813,6 +844,9 @@ class SessionDB:
|
||||
placeholders = ",".join("?" for _ in exclude_sources)
|
||||
where_clauses.append(f"s.source NOT IN ({placeholders})")
|
||||
params.extend(exclude_sources)
|
||||
if profile:
|
||||
where_clauses.append("s.profile = ?")
|
||||
params.append(profile)
|
||||
|
||||
where_sql = f"WHERE {' AND '.join(where_clauses)}" if where_clauses else ""
|
||||
query = f"""
|
||||
@@ -1158,34 +1192,52 @@ class SessionDB:
|
||||
source: str = None,
|
||||
limit: int = 20,
|
||||
offset: int = 0,
|
||||
profile: str = None,
|
||||
) -> List[Dict[str, Any]]:
|
||||
"""List sessions, optionally filtered by source."""
|
||||
"""List sessions, optionally filtered by source and profile.
|
||||
|
||||
Args:
|
||||
profile: Filter sessions to this profile name. Pass None to see all.
|
||||
(#323)
|
||||
"""
|
||||
where_clauses = []
|
||||
params = []
|
||||
if source:
|
||||
where_clauses.append("source = ?")
|
||||
params.append(source)
|
||||
if profile:
|
||||
where_clauses.append("profile = ?")
|
||||
params.append(profile)
|
||||
|
||||
where_sql = f"WHERE {' AND '.join(where_clauses)}" if where_clauses else ""
|
||||
query = f"SELECT * FROM sessions {where_sql} ORDER BY started_at DESC LIMIT ? OFFSET ?"
|
||||
params.extend([limit, offset])
|
||||
with self._lock:
|
||||
if source:
|
||||
cursor = self._conn.execute(
|
||||
"SELECT * FROM sessions WHERE source = ? ORDER BY started_at DESC LIMIT ? OFFSET ?",
|
||||
(source, limit, offset),
|
||||
)
|
||||
else:
|
||||
cursor = self._conn.execute(
|
||||
"SELECT * FROM sessions ORDER BY started_at DESC LIMIT ? OFFSET ?",
|
||||
(limit, offset),
|
||||
)
|
||||
cursor = self._conn.execute(query, params)
|
||||
return [dict(row) for row in cursor.fetchall()]
|
||||
|
||||
# =========================================================================
|
||||
# Utility
|
||||
# =========================================================================
|
||||
|
||||
def session_count(self, source: str = None) -> int:
|
||||
"""Count sessions, optionally filtered by source."""
|
||||
def session_count(self, source: str = None, profile: str = None) -> int:
|
||||
"""Count sessions, optionally filtered by source and profile.
|
||||
|
||||
Args:
|
||||
profile: Filter to this profile name. Pass None to count all. (#323)
|
||||
"""
|
||||
where_clauses = []
|
||||
params = []
|
||||
if source:
|
||||
where_clauses.append("source = ?")
|
||||
params.append(source)
|
||||
if profile:
|
||||
where_clauses.append("profile = ?")
|
||||
params.append(profile)
|
||||
|
||||
where_sql = f"WHERE {' AND '.join(where_clauses)}" if where_clauses else ""
|
||||
with self._lock:
|
||||
if source:
|
||||
cursor = self._conn.execute(
|
||||
"SELECT COUNT(*) FROM sessions WHERE source = ?", (source,)
|
||||
)
|
||||
else:
|
||||
cursor = self._conn.execute("SELECT COUNT(*) FROM sessions")
|
||||
cursor = self._conn.execute(f"SELECT COUNT(*) FROM sessions {where_sql}", params)
|
||||
return cursor.fetchone()[0]
|
||||
|
||||
def message_count(self, session_id: str = None) -> int:
|
||||
|
||||
@@ -1,177 +0,0 @@
|
||||
"""Tests for cron job stale error state clearing (#349)."""
|
||||
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch, MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
# Ensure cron.jobs can be imported without triggering cron/__init__.py
|
||||
# which may have additional imports that break in test isolation.
|
||||
sys.modules.setdefault("cron.scheduler", MagicMock())
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def isolated_cron_dir(tmp_path, monkeypatch):
|
||||
"""Point CRON_DIR at a temp directory for test isolation."""
|
||||
cron_dir = tmp_path / "cron"
|
||||
cron_dir.mkdir()
|
||||
output_dir = cron_dir / "output"
|
||||
output_dir.mkdir()
|
||||
monkeypatch.setattr("cron.jobs.CRON_DIR", cron_dir)
|
||||
monkeypatch.setattr("cron.jobs.JOBS_FILE", cron_dir / "jobs.json")
|
||||
monkeypatch.setattr("cron.jobs.OUTPUT_DIR", output_dir)
|
||||
return cron_dir
|
||||
|
||||
|
||||
def _make_job(cron_dir, job_id="test-job-1", last_status="error", last_error="auth revoked"):
|
||||
"""Write a minimal jobs.json with one job."""
|
||||
from hermes_time import now as _hermes_now
|
||||
job = {
|
||||
"id": job_id,
|
||||
"name": "Test Job",
|
||||
"prompt": "test prompt",
|
||||
"schedule": {"kind": "interval", "minutes": 10, "display": "every 10m"},
|
||||
"enabled": True,
|
||||
"state": "scheduled",
|
||||
"last_run_at": _hermes_now().isoformat(),
|
||||
"last_status": last_status,
|
||||
"last_error": last_error,
|
||||
"last_error_at": _hermes_now().isoformat(),
|
||||
"next_run_at": _hermes_now().isoformat(),
|
||||
}
|
||||
jobs_file = cron_dir / "jobs.json"
|
||||
jobs_file.write_text(json.dumps({"jobs": [job]}))
|
||||
return job
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# trigger_job clears stale error
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestTriggerJobClearsError:
|
||||
def test_trigger_clears_last_error(self, isolated_cron_dir):
|
||||
from cron.jobs import trigger_job, get_job
|
||||
|
||||
_make_job(isolated_cron_dir, last_status="error", last_error="Refresh session revoked")
|
||||
result = trigger_job("test-job-1")
|
||||
assert result is not None
|
||||
job = get_job("test-job-1")
|
||||
assert job["last_error"] is None
|
||||
assert job["last_status"] == "retrying"
|
||||
|
||||
def test_trigger_preserves_ok_status(self, isolated_cron_dir):
|
||||
from cron.jobs import trigger_job, get_job
|
||||
|
||||
_make_job(isolated_cron_dir, last_status="ok", last_error=None)
|
||||
result = trigger_job("test-job-1")
|
||||
assert result is not None
|
||||
job = get_job("test-job-1")
|
||||
assert job["last_error"] is None
|
||||
assert job["last_status"] == "ok" # stays ok, not retrying
|
||||
|
||||
def test_trigger_nonexistent_returns_none(self, isolated_cron_dir):
|
||||
from cron.jobs import trigger_job
|
||||
|
||||
result = trigger_job("nonexistent")
|
||||
assert result is None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# resume_job also clears stale error
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestResumeJobClearsError:
|
||||
def test_resume_clears_last_error(self, isolated_cron_dir):
|
||||
from cron.jobs import resume_job, get_job, pause_job
|
||||
|
||||
_make_job(isolated_cron_dir, last_status="error", last_error="auth revoked")
|
||||
pause_job("test-job-1", reason="auth issue")
|
||||
result = resume_job("test-job-1")
|
||||
assert result is not None
|
||||
job = get_job("test-job-1")
|
||||
assert job["last_error"] is None
|
||||
assert job["last_status"] == "retrying"
|
||||
assert job["state"] == "scheduled"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# mark_job_run tracks timestamps
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestMarkJobRunTimestamps:
|
||||
def test_success_sets_last_success_at(self, isolated_cron_dir):
|
||||
from cron.jobs import mark_job_run, get_job
|
||||
|
||||
_make_job(isolated_cron_dir)
|
||||
mark_job_run("test-job-1", success=True, error=None)
|
||||
job = get_job("test-job-1")
|
||||
assert job["last_status"] == "ok"
|
||||
assert job["last_error"] is None
|
||||
assert job.get("last_success_at") is not None
|
||||
|
||||
def test_error_sets_last_error_at(self, isolated_cron_dir):
|
||||
from cron.jobs import mark_job_run, get_job
|
||||
|
||||
_make_job(isolated_cron_dir, last_status="ok", last_error=None)
|
||||
mark_job_run("test-job-1", success=False, error="new error")
|
||||
job = get_job("test-job-1")
|
||||
assert job["last_status"] == "error"
|
||||
assert job["last_error"] == "new error"
|
||||
assert job.get("last_error_at") is not None
|
||||
|
||||
def test_success_clears_error(self, isolated_cron_dir):
|
||||
from cron.jobs import mark_job_run, get_job
|
||||
|
||||
_make_job(isolated_cron_dir, last_status="error", last_error="old error")
|
||||
mark_job_run("test-job-1", success=True, error=None)
|
||||
job = get_job("test-job-1")
|
||||
assert job["last_status"] == "ok"
|
||||
assert job["last_error"] is None
|
||||
assert job.get("last_success_at") is not None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Health distinction: error vs recovered
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestHealthDistinction:
|
||||
def test_recovered_job_shows_success_after_error(self, isolated_cron_dir):
|
||||
from cron.jobs import mark_job_run, get_job
|
||||
from hermes_time import now as _hermes_now
|
||||
|
||||
_make_job(isolated_cron_dir, last_status="error", last_error="auth revoked")
|
||||
# Simulate recovery
|
||||
mark_job_run("test-job-1", success=True, error=None)
|
||||
job = get_job("test-job-1")
|
||||
assert job["last_status"] == "ok"
|
||||
assert job["last_error"] is None
|
||||
# last_success_at should be after last_error_at
|
||||
assert job["last_success_at"] >= job.get("last_error_at", "")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _format_job includes health fields
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestFormatJobHealth:
|
||||
def test_format_includes_health_fields(self):
|
||||
from tools.cronjob_tools import _format_job
|
||||
|
||||
job = {
|
||||
"id": "j1",
|
||||
"name": "Test",
|
||||
"prompt": "hello",
|
||||
"schedule_display": "every 10m",
|
||||
"last_status": "error",
|
||||
"last_error": "revoked",
|
||||
"last_error_at": "2026-04-13T10:00:00",
|
||||
"last_success_at": "2026-04-13T09:00:00",
|
||||
}
|
||||
result = _format_job(job)
|
||||
assert result["last_error"] == "revoked"
|
||||
assert result["last_error_at"] == "2026-04-13T10:00:00"
|
||||
assert result["last_success_at"] == "2026-04-13T09:00:00"
|
||||
@@ -196,9 +196,6 @@ def _format_job(job: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"next_run_at": job.get("next_run_at"),
|
||||
"last_run_at": job.get("last_run_at"),
|
||||
"last_status": job.get("last_status"),
|
||||
"last_error": job.get("last_error"),
|
||||
"last_error_at": job.get("last_error_at"),
|
||||
"last_success_at": job.get("last_success_at"),
|
||||
"enabled": job.get("enabled", True),
|
||||
"state": job.get("state", "scheduled" if job.get("enabled", True) else "paused"),
|
||||
"paused_at": job.get("paused_at"),
|
||||
|
||||
Reference in New Issue
Block a user