Compare commits

...

7 Commits

Author SHA1 Message Date
Alexander Whitestone
4373ef2698 perf(cron): parallel job execution + priority sorting (#353)
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 20s
2026-04-13 04:21:14 -04:00
a547552ff7 Merge pull request 'fix(cron): guard against interpreter shutdown in run_job() and tick()' (#355) from fix/cron-interpreter-shutdown-352 into main
Some checks failed
Forge CI / smoke-and-build (push) Failing after 27s
Merge PR #355: fix(cron): guard against interpreter shutdown in run_job() and tick()
2026-04-13 07:32:06 +00:00
Alexander Whitestone
d6bd3bc10a fix(cron): guard against interpreter shutdown in run_job() and tick()
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 27s
Fixes #352

Problem: When the gateway restarts, Python's interpreter enters
shutdown phase while the last cron tick is still processing jobs.
ThreadPoolExecutor.submit() raises RuntimeError("cannot schedule
new futures after interpreter shutdown") for every remaining job.
This cascades through the entire tick queue.

Fix (two-part):
1. run_job(): Wrap ThreadPoolExecutor creation + submit in try/except.
   On RuntimeError, fall back to synchronous execution (same thread)
   so the job at least attempts instead of dying silently.
2. tick(): Check sys.is_finalizing() before each job. If the
   interpreter is shutting down, stop processing immediately
   instead of wasting time on doomed ThreadPoolExecutor.submit() calls.
2026-04-13 03:22:10 -04:00
7a577068f0 Merge pull request 'fix(cron): ensure ticker thread starts and monitor for death (#342)' (#345) from fix/cron-ticker-startup into main
Some checks failed
Forge CI / smoke-and-build (push) Failing after 25s
Auto-merge #345
2026-04-13 07:15:28 +00:00
Alexander Whitestone
cb9214cae0 fix(cron): ensure ticker thread starts and monitor for death
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 27s
Issue #342: Cron ticker thread not starting in gateway

Root cause: asyncio.get_running_loop() can raise RuntimeError in edge cases,
and ticker thread can die silently without restart.

Fix:
1. Wrap get_running_loop() in try/except with fallback
2. Add explicit logger.info when ticker starts
3. Add async monitor that restarts ticker if it dies
4. Log PID and thread name for debugging
2026-04-13 03:02:36 -04:00
eecff3fbf6 Merge pull request 'ci: add skills index workflow (rescued from #307)' (#335) from feat/skills-index-workflow into main
Some checks failed
Forge CI / smoke-and-build (push) Failing after 26s
2026-04-13 04:26:28 +00:00
Alexander Whitestone
4210412bef ci: add skills index workflow
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 37s
2026-04-13 00:23:59 -04:00
4 changed files with 208 additions and 14 deletions

View File

@@ -41,11 +41,19 @@ jobs:
python-version: '3.11'
- name: Install PyYAML for skill extraction
run: pip install pyyaml
run: pip install pyyaml httpx
- name: Extract skill metadata for dashboard
run: python3 website/scripts/extract-skills.py
- name: Build skills index (if not already present)
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: |
if [ ! -f website/static/api/skills-index.json ]; then
python3 scripts/build_skills_index.py || echo "Skills index build failed (non-fatal)"
fi
- name: Install dependencies
run: npm ci
working-directory: website

101
.github/workflows/skills-index.yml vendored Normal file
View File

@@ -0,0 +1,101 @@
name: Build Skills Index
on:
schedule:
# Run twice daily: 6 AM and 6 PM UTC
- cron: '0 6,18 * * *'
workflow_dispatch: # Manual trigger
push:
branches: [main]
paths:
- 'scripts/build_skills_index.py'
- '.github/workflows/skills-index.yml'
permissions:
contents: read
jobs:
build-index:
# Only run on the upstream repository, not on forks
if: github.repository == 'NousResearch/hermes-agent'
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install dependencies
run: pip install httpx pyyaml
- name: Build skills index
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: python scripts/build_skills_index.py
- name: Upload index artifact
uses: actions/upload-artifact@v4
with:
name: skills-index
path: website/static/api/skills-index.json
retention-days: 7
deploy-with-index:
needs: build-index
runs-on: ubuntu-latest
permissions:
pages: write
id-token: write
environment:
name: github-pages
url: ${{ steps.deploy.outputs.page_url }}
# Only deploy on schedule or manual trigger (not on every push to the script)
if: github.event_name == 'schedule' || github.event_name == 'workflow_dispatch'
steps:
- uses: actions/checkout@v4
- uses: actions/download-artifact@v4
with:
name: skills-index
path: website/static/api/
- uses: actions/setup-node@v4
with:
node-version: 20
cache: npm
cache-dependency-path: website/package-lock.json
- uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install PyYAML for skill extraction
run: pip install pyyaml
- name: Extract skill metadata for dashboard
run: python3 website/scripts/extract-skills.py
- name: Install dependencies
run: npm ci
working-directory: website
- name: Build Docusaurus
run: npm run build
working-directory: website
- name: Stage deployment
run: |
mkdir -p _site/docs
cp -r landingpage/* _site/
cp -r website/build/* _site/docs/
echo "hermes-agent.nousresearch.com" > _site/CNAME
- name: Upload artifact
uses: actions/upload-pages-artifact@v3
with:
path: _site
- name: Deploy to GitHub Pages
id: deploy
uses: actions/deploy-pages@v4

View File

@@ -628,8 +628,47 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
_cron_timeout = float(os.getenv("HERMES_CRON_TIMEOUT", 600))
_cron_inactivity_limit = _cron_timeout if _cron_timeout > 0 else None
_POLL_INTERVAL = 5.0
_cron_pool = concurrent.futures.ThreadPoolExecutor(max_workers=1)
_cron_future = _cron_pool.submit(agent.run_conversation, prompt)
# Guard against interpreter shutdown: ThreadPoolExecutor.submit()
# raises RuntimeError("cannot schedule new futures after interpreter
# shutdown") when Python is finalizing (e.g. gateway restart races).
# Fall back to synchronous execution so the job at least attempts.
_cron_pool = None
try:
_cron_pool = concurrent.futures.ThreadPoolExecutor(max_workers=1)
_cron_future = _cron_pool.submit(agent.run_conversation, prompt)
except RuntimeError:
logger.warning(
"Job '%s': ThreadPoolExecutor unavailable (interpreter shutdown?) "
"— falling back to synchronous execution",
job_name,
)
if _cron_pool is not None:
try:
_cron_pool.shutdown(wait=False)
except Exception:
pass
_cron_pool = None
result = agent.run_conversation(prompt)
final_response = result.get("final_response", "") or ""
logged_response = final_response if final_response else "(No response generated)"
output = f"""# Cron Job: {job_name}
**Job ID:** {job_id}
**Run Time:** {_hermes_now().strftime('%Y-%m-%d %H:%M:%S')}
**Schedule:** {job.get('schedule_display', 'N/A')}
## Prompt
{prompt}
## Response
{logged_response}
"""
logger.info("Job '%s' completed (sync fallback)", job_name)
return True, output, final_response, None
_inactivity_timeout = False
try:
if _cron_inactivity_limit is None:
@@ -656,10 +695,12 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
_inactivity_timeout = True
break
except Exception:
_cron_pool.shutdown(wait=False, cancel_futures=True)
if _cron_pool is not None:
_cron_pool.shutdown(wait=False, cancel_futures=True)
raise
finally:
_cron_pool.shutdown(wait=False)
if _cron_pool is not None:
_cron_pool.shutdown(wait=False)
if _inactivity_timeout:
# Build diagnostic summary from the agent's activity tracker.
@@ -798,16 +839,36 @@ def tick(verbose: bool = True, adapters=None, loop=None) -> int:
if verbose:
logger.info("%s - %s job(s) due", _hermes_now().strftime('%H:%M:%S'), len(due_jobs))
executed = 0
for job in due_jobs:
try:
# For recurring jobs (cron/interval), advance next_run_at to the
# next future occurrence BEFORE execution. This way, if the
# process crashes mid-run, the job won't re-fire on restart.
# One-shot jobs are left alone so they can retry on restart.
advance_next_run(job["id"])
# Parallel job execution with priority sorting
PRIORITY_ORDER = {"critical": 0, "high": 1, "normal": 2, "low": 3}
due_jobs_sorted = sorted(due_jobs, key=lambda j: PRIORITY_ORDER.get(j.get("priority", "normal"), 2))
MAX_PARALLEL = int(os.environ.get("HERMES_CRON_MAX_PARALLEL", "10"))
executed = 0
_job_results = []
def _process_single_job(job):
job_name = job.get("name", job["id"])
if sys.is_finalizing():
return None
try:
advance_next_run(job["id"])
success, output, final_response, error = run_job(job)
return (job, success, output, final_response, error)
except Exception as e:
logger.error("Job '%s': parallel error: %s", job_name, e)
return (job, False, "", None, str(e))
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_PARALLEL) as executor:
futures = {executor.submit(_process_single_job, job): job for job in due_jobs_sorted}
for future in concurrent.futures.as_completed(futures):
result = future.result()
if result is None:
continue
_job_results.append(result)
for job, success, output, final_response, error in _job_results:
executed += 1
output_file = save_job_output(job["id"], output)
if verbose:

View File

@@ -7496,17 +7496,41 @@ async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool =
# Start background cron ticker so scheduled jobs fire automatically.
# Pass the event loop so cron delivery can use live adapters (E2EE support).
cron_stop = threading.Event()
try:
_cron_loop = asyncio.get_running_loop()
except RuntimeError:
_cron_loop = None
logger.warning("No running event loop — cron ticker will run without adapter delivery")
cron_thread = threading.Thread(
target=_start_cron_ticker,
args=(cron_stop,),
kwargs={"adapters": runner.adapters, "loop": asyncio.get_running_loop()},
kwargs={"adapters": runner.adapters, "loop": _cron_loop},
daemon=True,
name="cron-ticker",
)
cron_thread.start()
logger.info("Cron ticker thread started (pid=%d, thread=%s)", os.getpid(), cron_thread.name)
# Monitor ticker thread — restart if it dies unexpectedly
async def _monitor_ticker():
while not cron_stop.is_set():
if not cron_thread.is_alive():
logger.warning("Cron ticker thread died — restarting")
cron_thread2 = threading.Thread(
target=_start_cron_ticker,
args=(cron_stop,),
kwargs={"adapters": runner.adapters, "loop": _cron_loop},
daemon=True,
name="cron-ticker-restart",
)
cron_thread2.start()
logger.info("Cron ticker thread restarted")
await asyncio.sleep(30)
monitor_task = asyncio.create_task(_monitor_ticker())
# Wait for shutdown
await runner.wait_for_shutdown()
monitor_task.cancel()
if runner.should_exit_with_failure:
if runner.exit_reason: