Compare commits

...

5 Commits

Author SHA1 Message Date
Alexander Whitestone
9323f5a580 fix: unescape corrupted quotes in mempalace __init__.py
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 26s
The file had literal escaped quotes (backslash-quote) throughout,
causing SyntaxError on import. Replaced with proper quotes.
2026-04-13 09:57:26 -04:00
a547552ff7 Merge pull request 'fix(cron): guard against interpreter shutdown in run_job() and tick()' (#355) from fix/cron-interpreter-shutdown-352 into main
Some checks failed
Forge CI / smoke-and-build (push) Failing after 27s
Merge PR #355: fix(cron): guard against interpreter shutdown in run_job() and tick()
2026-04-13 07:32:06 +00:00
Alexander Whitestone
d6bd3bc10a fix(cron): guard against interpreter shutdown in run_job() and tick()
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 27s
Fixes #352

Problem: When the gateway restarts, Python's interpreter enters
shutdown phase while the last cron tick is still processing jobs.
ThreadPoolExecutor.submit() raises RuntimeError("cannot schedule
new futures after interpreter shutdown") for every remaining job.
This cascades through the entire tick queue.

Fix (two-part):
1. run_job(): Wrap ThreadPoolExecutor creation + submit in try/except.
   On RuntimeError, fall back to synchronous execution (same thread)
   so the job at least attempts instead of dying silently.
2. tick(): Check sys.is_finalizing() before each job. If the
   interpreter is shutting down, stop processing immediately
   instead of wasting time on doomed ThreadPoolExecutor.submit() calls.
2026-04-13 03:22:10 -04:00
7a577068f0 Merge pull request 'fix(cron): ensure ticker thread starts and monitor for death (#342)' (#345) from fix/cron-ticker-startup into main
Some checks failed
Forge CI / smoke-and-build (push) Failing after 25s
Auto-merge #345
2026-04-13 07:15:28 +00:00
Alexander Whitestone
cb9214cae0 fix(cron): ensure ticker thread starts and monitor for death
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 27s
Issue #342: Cron ticker thread not starting in gateway

Root cause: asyncio.get_running_loop() can raise RuntimeError in edge cases,
and ticker thread can die silently without restart.

Fix:
1. Wrap get_running_loop() in try/except with fallback
2. Add explicit logger.info when ticker starts
3. Add async monitor that restarts ticker if it dies
4. Log PID and thread name for debugging
2026-04-13 03:02:36 -04:00
3 changed files with 85 additions and 10 deletions

View File

@@ -628,8 +628,47 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
_cron_timeout = float(os.getenv("HERMES_CRON_TIMEOUT", 600))
_cron_inactivity_limit = _cron_timeout if _cron_timeout > 0 else None
_POLL_INTERVAL = 5.0
_cron_pool = concurrent.futures.ThreadPoolExecutor(max_workers=1)
_cron_future = _cron_pool.submit(agent.run_conversation, prompt)
# Guard against interpreter shutdown: ThreadPoolExecutor.submit()
# raises RuntimeError("cannot schedule new futures after interpreter
# shutdown") when Python is finalizing (e.g. gateway restart races).
# Fall back to synchronous execution so the job at least attempts.
_cron_pool = None
try:
_cron_pool = concurrent.futures.ThreadPoolExecutor(max_workers=1)
_cron_future = _cron_pool.submit(agent.run_conversation, prompt)
except RuntimeError:
logger.warning(
"Job '%s': ThreadPoolExecutor unavailable (interpreter shutdown?) "
"— falling back to synchronous execution",
job_name,
)
if _cron_pool is not None:
try:
_cron_pool.shutdown(wait=False)
except Exception:
pass
_cron_pool = None
result = agent.run_conversation(prompt)
final_response = result.get("final_response", "") or ""
logged_response = final_response if final_response else "(No response generated)"
output = f"""# Cron Job: {job_name}
**Job ID:** {job_id}
**Run Time:** {_hermes_now().strftime('%Y-%m-%d %H:%M:%S')}
**Schedule:** {job.get('schedule_display', 'N/A')}
## Prompt
{prompt}
## Response
{logged_response}
"""
logger.info("Job '%s' completed (sync fallback)", job_name)
return True, output, final_response, None
_inactivity_timeout = False
try:
if _cron_inactivity_limit is None:
@@ -656,10 +695,12 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
_inactivity_timeout = True
break
except Exception:
_cron_pool.shutdown(wait=False, cancel_futures=True)
if _cron_pool is not None:
_cron_pool.shutdown(wait=False, cancel_futures=True)
raise
finally:
_cron_pool.shutdown(wait=False)
if _cron_pool is not None:
_cron_pool.shutdown(wait=False)
if _inactivity_timeout:
# Build diagnostic summary from the agent's activity tracker.
@@ -800,6 +841,16 @@ def tick(verbose: bool = True, adapters=None, loop=None) -> int:
executed = 0
for job in due_jobs:
# If the interpreter is shutting down (e.g. gateway restart),
# stop processing immediately — ThreadPoolExecutor.submit()
# will raise RuntimeError for every remaining job.
if sys.is_finalizing():
logger.warning(
"Interpreter finalizing — skipping %d remaining job(s)",
len(due_jobs) - executed,
)
break
try:
# For recurring jobs (cron/interval), advance next_run_at to the
# next future occurrence BEFORE execution. This way, if the

View File

@@ -7496,17 +7496,41 @@ async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool =
# Start background cron ticker so scheduled jobs fire automatically.
# Pass the event loop so cron delivery can use live adapters (E2EE support).
cron_stop = threading.Event()
try:
_cron_loop = asyncio.get_running_loop()
except RuntimeError:
_cron_loop = None
logger.warning("No running event loop — cron ticker will run without adapter delivery")
cron_thread = threading.Thread(
target=_start_cron_ticker,
args=(cron_stop,),
kwargs={"adapters": runner.adapters, "loop": asyncio.get_running_loop()},
kwargs={"adapters": runner.adapters, "loop": _cron_loop},
daemon=True,
name="cron-ticker",
)
cron_thread.start()
logger.info("Cron ticker thread started (pid=%d, thread=%s)", os.getpid(), cron_thread.name)
# Monitor ticker thread — restart if it dies unexpectedly
async def _monitor_ticker():
while not cron_stop.is_set():
if not cron_thread.is_alive():
logger.warning("Cron ticker thread died — restarting")
cron_thread2 = threading.Thread(
target=_start_cron_ticker,
args=(cron_stop,),
kwargs={"adapters": runner.adapters, "loop": _cron_loop},
daemon=True,
name="cron-ticker-restart",
)
cron_thread2.start()
logger.info("Cron ticker thread restarted")
await asyncio.sleep(30)
monitor_task = asyncio.create_task(_monitor_ticker())
# Wait for shutdown
await runner.wait_for_shutdown()
monitor_task.cancel()
if runner.should_exit_with_failure:
if runner.exit_reason:

View File

@@ -172,13 +172,13 @@ class MemPalacePortalProvider(MemoryProvider):
# Extract key facts for Hologram
if self._hologram_store:
# Simple heuristic: look for \"I prefer\", \"The project uses\", etc.
# Simple heuristic: look for "I prefer", "The project uses", etc.
for msg in messages:
if msg.get(\"role\") == \"user\":
content = msg.get(\"content\", \"\")
if \"prefer\" in content.lower() or \"use\" in content.lower():
if msg.get("role") == "user":
content = msg.get("content", "")
if "prefer" in content.lower() or "use" in content.lower():
try:
self._hologram_store.add_fact(content[:200], category=\"user_pref\")
self._hologram_store.add_fact(content[:200], category="user_pref")
except Exception:
pass