Compare commits
3 Commits
feat/deplo
...
ci/fix-mem
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9323f5a580 | ||
| a547552ff7 | |||
|
|
d6bd3bc10a |
@@ -628,8 +628,47 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
|
|||||||
_cron_timeout = float(os.getenv("HERMES_CRON_TIMEOUT", 600))
|
_cron_timeout = float(os.getenv("HERMES_CRON_TIMEOUT", 600))
|
||||||
_cron_inactivity_limit = _cron_timeout if _cron_timeout > 0 else None
|
_cron_inactivity_limit = _cron_timeout if _cron_timeout > 0 else None
|
||||||
_POLL_INTERVAL = 5.0
|
_POLL_INTERVAL = 5.0
|
||||||
_cron_pool = concurrent.futures.ThreadPoolExecutor(max_workers=1)
|
|
||||||
_cron_future = _cron_pool.submit(agent.run_conversation, prompt)
|
# Guard against interpreter shutdown: ThreadPoolExecutor.submit()
|
||||||
|
# raises RuntimeError("cannot schedule new futures after interpreter
|
||||||
|
# shutdown") when Python is finalizing (e.g. gateway restart races).
|
||||||
|
# Fall back to synchronous execution so the job at least attempts.
|
||||||
|
_cron_pool = None
|
||||||
|
try:
|
||||||
|
_cron_pool = concurrent.futures.ThreadPoolExecutor(max_workers=1)
|
||||||
|
_cron_future = _cron_pool.submit(agent.run_conversation, prompt)
|
||||||
|
except RuntimeError:
|
||||||
|
logger.warning(
|
||||||
|
"Job '%s': ThreadPoolExecutor unavailable (interpreter shutdown?) "
|
||||||
|
"— falling back to synchronous execution",
|
||||||
|
job_name,
|
||||||
|
)
|
||||||
|
if _cron_pool is not None:
|
||||||
|
try:
|
||||||
|
_cron_pool.shutdown(wait=False)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
_cron_pool = None
|
||||||
|
result = agent.run_conversation(prompt)
|
||||||
|
final_response = result.get("final_response", "") or ""
|
||||||
|
logged_response = final_response if final_response else "(No response generated)"
|
||||||
|
output = f"""# Cron Job: {job_name}
|
||||||
|
|
||||||
|
**Job ID:** {job_id}
|
||||||
|
**Run Time:** {_hermes_now().strftime('%Y-%m-%d %H:%M:%S')}
|
||||||
|
**Schedule:** {job.get('schedule_display', 'N/A')}
|
||||||
|
|
||||||
|
## Prompt
|
||||||
|
|
||||||
|
{prompt}
|
||||||
|
|
||||||
|
## Response
|
||||||
|
|
||||||
|
{logged_response}
|
||||||
|
"""
|
||||||
|
logger.info("Job '%s' completed (sync fallback)", job_name)
|
||||||
|
return True, output, final_response, None
|
||||||
|
|
||||||
_inactivity_timeout = False
|
_inactivity_timeout = False
|
||||||
try:
|
try:
|
||||||
if _cron_inactivity_limit is None:
|
if _cron_inactivity_limit is None:
|
||||||
@@ -656,10 +695,12 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
|
|||||||
_inactivity_timeout = True
|
_inactivity_timeout = True
|
||||||
break
|
break
|
||||||
except Exception:
|
except Exception:
|
||||||
_cron_pool.shutdown(wait=False, cancel_futures=True)
|
if _cron_pool is not None:
|
||||||
|
_cron_pool.shutdown(wait=False, cancel_futures=True)
|
||||||
raise
|
raise
|
||||||
finally:
|
finally:
|
||||||
_cron_pool.shutdown(wait=False)
|
if _cron_pool is not None:
|
||||||
|
_cron_pool.shutdown(wait=False)
|
||||||
|
|
||||||
if _inactivity_timeout:
|
if _inactivity_timeout:
|
||||||
# Build diagnostic summary from the agent's activity tracker.
|
# Build diagnostic summary from the agent's activity tracker.
|
||||||
@@ -800,6 +841,16 @@ def tick(verbose: bool = True, adapters=None, loop=None) -> int:
|
|||||||
|
|
||||||
executed = 0
|
executed = 0
|
||||||
for job in due_jobs:
|
for job in due_jobs:
|
||||||
|
# If the interpreter is shutting down (e.g. gateway restart),
|
||||||
|
# stop processing immediately — ThreadPoolExecutor.submit()
|
||||||
|
# will raise RuntimeError for every remaining job.
|
||||||
|
if sys.is_finalizing():
|
||||||
|
logger.warning(
|
||||||
|
"Interpreter finalizing — skipping %d remaining job(s)",
|
||||||
|
len(due_jobs) - executed,
|
||||||
|
)
|
||||||
|
break
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# For recurring jobs (cron/interval), advance next_run_at to the
|
# For recurring jobs (cron/interval), advance next_run_at to the
|
||||||
# next future occurrence BEFORE execution. This way, if the
|
# next future occurrence BEFORE execution. This way, if the
|
||||||
|
|||||||
@@ -172,13 +172,13 @@ class MemPalacePortalProvider(MemoryProvider):
|
|||||||
|
|
||||||
# Extract key facts for Hologram
|
# Extract key facts for Hologram
|
||||||
if self._hologram_store:
|
if self._hologram_store:
|
||||||
# Simple heuristic: look for \"I prefer\", \"The project uses\", etc.
|
# Simple heuristic: look for "I prefer", "The project uses", etc.
|
||||||
for msg in messages:
|
for msg in messages:
|
||||||
if msg.get(\"role\") == \"user\":
|
if msg.get("role") == "user":
|
||||||
content = msg.get(\"content\", \"\")
|
content = msg.get("content", "")
|
||||||
if \"prefer\" in content.lower() or \"use\" in content.lower():
|
if "prefer" in content.lower() or "use" in content.lower():
|
||||||
try:
|
try:
|
||||||
self._hologram_store.add_fact(content[:200], category=\"user_pref\")
|
self._hologram_store.add_fact(content[:200], category="user_pref")
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user