Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
c325a70ddb feat: profile-scoped cron with parallel execution (#334)
All checks were successful
Lint / lint (pull_request) Successful in 17s
- Add optional profile field to cron jobs (jobs.py, cronjob_tools.py)
- Jobs with a profile run inside that profile's HERMES_HOME context
  using a new _profile_context() context manager in scheduler.py
- tick() now groups due jobs by profile and executes profiles in
  parallel via ThreadPoolExecutor, while jobs within a profile run
  sequentially to avoid config/env contamination
- CLI: add --profile to hermes cron create and cron edit
- Display profile in hermes cron list output
- Tests: profile context env switching, parallel tick grouping

Closes #334
2026-04-22 03:31:37 -04:00
11 changed files with 203 additions and 689 deletions

View File

@@ -378,6 +378,7 @@ def create_job(
provider: Optional[str] = None,
base_url: Optional[str] = None,
script: Optional[str] = None,
profile: Optional[str] = None,
) -> Dict[str, Any]:
"""
Create a new cron job.
@@ -427,6 +428,8 @@ def create_job(
normalized_base_url = normalized_base_url or None
normalized_script = str(script).strip() if isinstance(script, str) else None
normalized_script = normalized_script or None
normalized_profile = str(profile).strip() if isinstance(profile, str) else None
normalized_profile = normalized_profile or None
label_source = (prompt or (normalized_skills[0] if normalized_skills else None)) or "cron job"
job = {
@@ -439,6 +442,7 @@ def create_job(
"provider": normalized_provider,
"base_url": normalized_base_url,
"script": normalized_script,
"profile": normalized_profile,
"schedule": parsed_schedule,
"schedule_display": parsed_schedule.get("display", schedule),
"repeat": {

View File

@@ -15,6 +15,7 @@ import logging
import os
import subprocess
import sys
from contextlib import contextmanager
# fcntl is Unix-only; on Windows use msvcrt for file locking
try:
@@ -26,7 +27,7 @@ except ImportError:
except ImportError:
msvcrt = None
from pathlib import Path
from typing import Optional
from typing import Optional, Dict, List, Any
# Add parent directory to path for imports BEFORE repo-level imports.
# Without this, standalone invocations (e.g. after `hermes update` reloads
@@ -39,6 +40,46 @@ from hermes_time import now as _hermes_now
logger = logging.getLogger(__name__)
# Profile-scoped cron execution (#334)
# Each job can specify a profile; the scheduler switches HERMES_HOME to that
# profile's directory before execution, then restores it afterward.
@contextmanager
def _profile_context(profile_name: Optional[str]):
"""
Temporarily switch HERMES_HOME to a profile's directory.
If profile_name is None or 'default', this is a no-op.
"""
if not profile_name or profile_name == "default":
yield
return
from hermes_cli.profiles import get_profile_dir
profile_dir = get_profile_dir(profile_name)
if not profile_dir.exists():
logger.warning("Profile '%s' does not exist at %s — running in default context", profile_name, profile_dir)
yield
return
old_home = os.environ.get("HERMES_HOME")
old_profile = os.environ.get("HERMES_ACTIVE_PROFILE")
try:
os.environ["HERMES_HOME"] = str(profile_dir)
os.environ["HERMES_ACTIVE_PROFILE"] = profile_name
logger.info("Switched to profile '%s' (HERMES_HOME=%s)", profile_name, profile_dir)
yield
finally:
if old_home is not None:
os.environ["HERMES_HOME"] = old_home
elif "HERMES_HOME" in os.environ:
del os.environ["HERMES_HOME"]
if old_profile is not None:
os.environ["HERMES_ACTIVE_PROFILE"] = old_profile
elif "HERMES_ACTIVE_PROFILE" in os.environ:
del os.environ["HERMES_ACTIVE_PROFILE"]
# Valid delivery platforms — used to validate user-supplied platform names
# in cron delivery targets, preventing env var enumeration via crafted names.
_KNOWN_DELIVERY_PLATFORMS = frozenset({
@@ -585,6 +626,15 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
"""
from run_agent import AIAgent
profile = job.get("profile")
with _profile_context(profile):
return _run_job_inner(job)
def _run_job_inner(job: dict) -> tuple[bool, str, str, Optional[str]]:
"""Core job execution (inside profile context)."""
from run_agent import AIAgent
# Initialize SQLite session store so cron job messages are persisted
# and discoverable via session_search (same pattern as gateway/run.py).
_session_db = None
@@ -939,44 +989,71 @@ def tick(verbose: bool = True, adapters=None, loop=None) -> int:
if verbose:
logger.info("%s - %s job(s) due", _hermes_now().strftime('%H:%M:%S'), len(due_jobs))
executed = 0
# Group jobs by profile. Jobs within the same profile run sequentially
# to avoid HERMES_HOME/config/env contamination. Different profiles run
# in parallel via ThreadPoolExecutor.
profile_groups: Dict[Optional[str], List[Dict[str, Any]]] = {}
for job in due_jobs:
try:
# For recurring jobs (cron/interval), advance next_run_at to the
# next future occurrence BEFORE execution. This way, if the
# process crashes mid-run, the job won't re-fire on restart.
# One-shot jobs are left alone so they can retry on restart.
advance_next_run(job["id"])
profile = job.get("profile")
profile_groups.setdefault(profile, []).append(job)
success, output, final_response, error = run_job(job)
executed = 0
output_file = save_job_output(job["id"], output)
if verbose:
logger.info("Output saved to: %s", output_file)
def _run_profile_jobs(profile_name: Optional[str], jobs: List[Dict[str, Any]]) -> int:
"""Run all jobs for a single profile sequentially."""
count = 0
for job in jobs:
try:
advance_next_run(job["id"])
success, output, final_response, error = run_job(job)
# Deliver the final response to the origin/target chat.
# If the agent responded with [SILENT], skip delivery (but
# output is already saved above). Failed jobs always deliver.
deliver_content = final_response if success else f"⚠️ Cron job '{job.get('name', job['id'])}' failed:\n{error}"
should_deliver = bool(deliver_content)
if should_deliver and success and SILENT_MARKER in deliver_content.strip().upper():
logger.info("Job '%s': agent returned %s — skipping delivery", job["id"], SILENT_MARKER)
should_deliver = False
output_file = save_job_output(job["id"], output)
if verbose:
logger.info("Output saved to: %s", output_file)
delivery_error = None
if should_deliver:
deliver_content = final_response if success else f"⚠️ Cron job '{job.get('name', job['id'])}' failed:\n{error}"
should_deliver = bool(deliver_content)
if should_deliver and success and SILENT_MARKER in deliver_content.strip().upper():
logger.info("Job '%s': agent returned %s — skipping delivery", job["id"], SILENT_MARKER)
should_deliver = False
delivery_error = None
if should_deliver:
try:
delivery_error = _deliver_result(job, deliver_content, adapters=adapters, loop=loop)
except Exception as de:
delivery_error = str(de)
logger.error("Delivery failed for job %s: %s", job["id"], de)
mark_job_run(job["id"], success, error, delivery_error=delivery_error)
count += 1
except Exception as e:
logger.error("Error processing job %s: %s", job['id'], e)
mark_job_run(job["id"], False, str(e))
return count
# Execute profiles in parallel
if len(profile_groups) == 1:
# Single profile — no need for thread pool overhead
profile, jobs = next(iter(profile_groups.items()))
executed = _run_profile_jobs(profile, jobs)
else:
max_workers = min(len(profile_groups), 8)
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(_run_profile_jobs, profile, jobs): profile
for profile, jobs in profile_groups.items()
}
for future in concurrent.futures.as_completed(futures):
profile = futures[future]
try:
delivery_error = _deliver_result(job, deliver_content, adapters=adapters, loop=loop)
except Exception as de:
delivery_error = str(de)
logger.error("Delivery failed for job %s: %s", job["id"], de)
mark_job_run(job["id"], success, error, delivery_error=delivery_error)
executed += 1
except Exception as e:
logger.error("Error processing job %s: %s", job['id'], e)
mark_job_run(job["id"], False, str(e))
count = future.result()
executed += count
if verbose:
logger.info("Profile '%s': %s job(s) executed", profile or "default", count)
except Exception as e:
logger.error("Profile '%s' execution failed: %s", profile or "default", e)
return executed
finally:

View File

@@ -1,68 +0,0 @@
# RAGFlow integration
This repo-side slice adds:
- `tools/ragflow_tool.py`
- `ragflow_ingest(document_url, dataset)`
- `ragflow_query(query, dataset, limit=5)`
- `scripts/ragflow_bootstrap.py`
- fetches the upstream RAGFlow Docker bundle
- runs `docker compose --profile cpu up -d` or `gpu`
## Deployment
Bootstrap the upstream CPU stack locally:
```bash
python3 scripts/ragflow_bootstrap.py --profile cpu
```
Dry-run only:
```bash
python3 scripts/ragflow_bootstrap.py --profile cpu --dry-run
```
Fetch files without launching Docker:
```bash
python3 scripts/ragflow_bootstrap.py --no-up
```
Default bundle target:
- `~/.hermes/services/ragflow`
## Runtime configuration
Optional environment variables:
- `RAGFLOW_API_URL` — defaults to `http://localhost:9380`
- `RAGFLOW_API_KEY` — Bearer token for authenticated RAGFlow APIs
## Supported document types
RAGFlow ingest accepts:
- PDF: `.pdf`
- Word: `.doc`, `.docx`
- Presentations: `.ppt`, `.pptx`
- Images via OCR: `.png`, `.jpg`, `.jpeg`, `.webp`, `.bmp`, `.tif`, `.tiff`, `.gif`
- Text and codebase documents: `.txt`, `.md`, `.rst`, `.html`, `.json`, `.yaml`, `.yml`, `.toml`, `.ini`, `.py`, `.js`, `.ts`, `.tsx`, `.jsx`, `.java`, `.go`, `.rs`, `.c`, `.cpp`, `.h`, `.hpp`, `.rb`, `.php`, `.sql`, `.sh`
## Example tool usage
```json
{"document_url":"https://arxiv.org/pdf/1706.03762.pdf","dataset":"research-papers"}
```
```json
{"query":"What does the paper say about attention heads?","dataset":"research-papers","limit":5}
```
## Use cases
- research papers
- technical documentation
- OCR-heavy image workflows
- ingested codebases and architecture docs

View File

@@ -88,6 +88,8 @@ def cron_list(show_all: bool = False):
print(f" Repeat: {repeat_str}")
print(f" Next run: {next_run}")
print(f" Deliver: {deliver_str}")
if job.get("profile"):
print(f" Profile: {job['profile']}")
if skills:
print(f" Skills: {', '.join(skills)}")
script = job.get("script")
@@ -168,6 +170,7 @@ def cron_create(args):
skill=getattr(args, "skill", None),
skills=_normalize_skills(getattr(args, "skill", None), getattr(args, "skills", None)),
script=getattr(args, "script", None),
profile=getattr(args, "profile", None),
)
if not result.get("success"):
print(color(f"Failed to create job: {result.get('error', 'unknown error')}", Colors.RED))
@@ -218,6 +221,7 @@ def cron_edit(args):
repeat=getattr(args, "repeat", None),
skills=final_skills,
script=getattr(args, "script", None),
profile=getattr(args, "profile", None),
)
if not result.get("success"):
print(color(f"Failed to update job: {result.get('error', 'unknown error')}", Colors.RED))

View File

@@ -4959,6 +4959,7 @@ For more help on a command:
cron_create.add_argument("--repeat", type=int, help="Optional repeat count")
cron_create.add_argument("--skill", dest="skills", action="append", help="Attach a skill. Repeat to add multiple skills.")
cron_create.add_argument("--script", help="Path to a Python script whose stdout is injected into the prompt each run")
cron_create.add_argument("--profile", help="Profile to run job in (uses profile's config.yaml and .env)")
# cron edit
cron_edit = cron_subparsers.add_parser("edit", help="Edit an existing scheduled job")
@@ -4973,6 +4974,7 @@ For more help on a command:
cron_edit.add_argument("--remove-skill", dest="remove_skills", action="append", help="Remove a specific attached skill. Repeatable.")
cron_edit.add_argument("--clear-skills", action="store_true", help="Remove all attached skills from the job")
cron_edit.add_argument("--script", help="Path to a Python script whose stdout is injected into the prompt each run. Pass empty string to clear.")
cron_edit.add_argument("--profile", help="Profile to run job in (uses profile's config.yaml and .env)")
# lifecycle actions
cron_pause = cron_subparsers.add_parser("pause", help="Pause a scheduled job")

View File

@@ -1,79 +0,0 @@
#!/usr/bin/env python3
"""Bootstrap an upstream RAGFlow Docker bundle for Hermes.
This script fetches the upstream RAGFlow docker bundle into a local directory
so operators can run `docker compose --profile cpu up -d` (or `gpu`) without
manually assembling the required files.
"""
from __future__ import annotations
import argparse
import subprocess
import urllib.request
from pathlib import Path
UPSTREAM_BASE = "https://raw.githubusercontent.com/infiniflow/ragflow/main/docker"
UPSTREAM_FILES = {
"docker-compose.yml": f"{UPSTREAM_BASE}/docker-compose.yml",
"docker-compose-base.yml": f"{UPSTREAM_BASE}/docker-compose-base.yml",
".env": f"{UPSTREAM_BASE}/.env",
"service_conf.yaml.template": f"{UPSTREAM_BASE}/service_conf.yaml.template",
"entrypoint.sh": f"{UPSTREAM_BASE}/entrypoint.sh",
}
def materialize_bundle(target_dir: str | Path, overwrite: bool = False) -> list[Path]:
target = Path(target_dir).expanduser()
target.mkdir(parents=True, exist_ok=True)
written: list[Path] = []
for name, url in UPSTREAM_FILES.items():
dest = target / name
if dest.exists() and not overwrite:
written.append(dest)
continue
with urllib.request.urlopen(url, timeout=60) as response:
dest.write_bytes(response.read())
if name == "entrypoint.sh":
dest.chmod(0o755)
written.append(dest)
return written
def build_compose_command(target_dir: str | Path, profile: str = "cpu") -> list[str]:
return ["docker", "compose", "--profile", profile, "up", "-d"]
def run_compose(target_dir: str | Path, profile: str = "cpu", dry_run: bool = False) -> dict:
target = Path(target_dir).expanduser()
command = build_compose_command(target, profile=profile)
if dry_run:
return {"target_dir": str(target), "command": command, "executed": False}
subprocess.run(command, cwd=target, check=True)
return {"target_dir": str(target), "command": command, "executed": True}
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(description="Fetch and launch the upstream RAGFlow Docker bundle")
parser.add_argument("--target-dir", default=str(Path.home() / ".hermes" / "services" / "ragflow"))
parser.add_argument("--profile", choices=["cpu", "gpu"], default="cpu")
parser.add_argument("--overwrite", action="store_true")
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--no-up", action="store_true", help="Only fetch bundle files; do not run docker compose")
args = parser.parse_args(argv)
written = materialize_bundle(args.target_dir, overwrite=args.overwrite)
print(f"Fetched {len(written)} RAGFlow docker files into {Path(args.target_dir).expanduser()}")
if args.no_up:
return 0
result = run_compose(args.target_dir, profile=args.profile, dry_run=args.dry_run)
print("Command:", " ".join(result["command"]))
if result["executed"]:
print("RAGFlow docker stack launch requested.")
else:
print("Dry run only; docker compose not executed.")
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -1217,3 +1217,80 @@ class TestSendMediaViaAdapter:
self._run_with_loop(adapter, "123", media_files, None, {"id": "j4"})
adapter.send_voice.assert_called_once()
adapter.send_image_file.assert_called_once()
class TestProfileContext:
"""Tests for profile-scoped cron execution (#334)."""
def test_profile_context_noop_for_none(self):
from cron.scheduler import _profile_context
with _profile_context(None):
assert os.environ.get("HERMES_ACTIVE_PROFILE") is None
def test_profile_context_noop_for_default(self):
from cron.scheduler import _profile_context
with _profile_context("default"):
assert os.environ.get("HERMES_ACTIVE_PROFILE") is None
def test_profile_context_sets_env(self, tmp_path, monkeypatch):
from cron.scheduler import _profile_context
from hermes_cli.profiles import get_profile_dir
# Mock home to tmp_path
monkeypatch.setattr("pathlib.Path.home", lambda: tmp_path)
monkeypatch.setenv("HERMES_HOME", str(tmp_path / ".hermes"))
# Create a test profile
profile_dir = get_profile_dir("testprofile")
profile_dir.mkdir(parents=True)
with _profile_context("testprofile"):
assert os.environ["HERMES_ACTIVE_PROFILE"] == "testprofile"
assert os.environ["HERMES_HOME"] == str(profile_dir)
# After exit, env restored
assert os.environ.get("HERMES_ACTIVE_PROFILE") is None
assert os.environ["HERMES_HOME"] == str(tmp_path / ".hermes")
def test_profile_context_missing_profile_warns(self, tmp_path, monkeypatch, caplog):
from cron.scheduler import _profile_context
monkeypatch.setattr("pathlib.Path.home", lambda: tmp_path)
monkeypatch.setenv("HERMES_HOME", str(tmp_path / ".hermes"))
with caplog.at_level("WARNING"):
with _profile_context("nonexistent"):
pass
assert "does not exist" in caplog.text
class TestTickParallelExecution:
"""Tests for parallel profile execution in tick()."""
def test_jobs_grouped_by_profile(self, tmp_path, monkeypatch):
from cron.scheduler import tick
from cron.jobs import create_job, update_job, JOBS_FILE
import datetime
# Point jobs file to tmp_path so we don't pollute real cron db
test_jobs_file = tmp_path / "jobs.json"
monkeypatch.setattr("cron.jobs.JOBS_FILE", test_jobs_file)
monkeypatch.setenv("HERMES_HOME", str(tmp_path / ".hermes"))
job1 = create_job(prompt="test1", schedule="* * * * *", name="j1", profile="alpha")
job2 = create_job(prompt="test2", schedule="* * * * *", name="j2", profile="beta")
job3 = create_job(prompt="test3", schedule="* * * * *", name="j3") # no profile
# Manually set next_run_at to now so they are due
now = datetime.datetime.now(datetime.timezone.utc).isoformat()
for job in [job1, job2, job3]:
update_job(job["id"], {"next_run_at": now})
with patch("cron.scheduler.run_job") as mock_run:
mock_run.return_value = (True, "output", "response", None)
with patch("cron.scheduler.save_job_output"):
with patch("cron.scheduler._deliver_result", return_value=None):
executed = tick(verbose=False)
# All 3 should have been executed
assert executed == 3
assert mock_run.call_count == 3

View File

@@ -1,43 +0,0 @@
from __future__ import annotations
import importlib.util
import io
from pathlib import Path
from unittest.mock import patch
ROOT = Path(__file__).resolve().parent.parent
SCRIPT_PATH = ROOT / "scripts" / "ragflow_bootstrap.py"
def _load_module():
spec = importlib.util.spec_from_file_location("ragflow_bootstrap", SCRIPT_PATH)
module = importlib.util.module_from_spec(spec)
assert spec.loader is not None
spec.loader.exec_module(module)
return module
def test_materialize_bundle_downloads_required_upstream_artifacts(tmp_path):
module = _load_module()
def fake_urlopen(url, timeout=0):
name = url.rsplit("/", 1)[-1]
return io.BytesIO(f"# fetched {name}\n".encode())
with patch.object(module.urllib.request, "urlopen", side_effect=fake_urlopen):
written = module.materialize_bundle(tmp_path)
assert (tmp_path / "docker-compose.yml").exists()
assert (tmp_path / "docker-compose-base.yml").exists()
assert (tmp_path / ".env").exists()
assert any(path.name == "entrypoint.sh" for path in written)
def test_build_compose_command_respects_profile_and_directory(tmp_path):
module = _load_module()
command = module.build_compose_command(tmp_path, profile="gpu")
assert command[:4] == ["docker", "compose", "--profile", "gpu"]
assert command[-2:] == ["up", "-d"]

View File

@@ -1,122 +0,0 @@
from __future__ import annotations
import importlib
import json
import sys
from pathlib import Path
from unittest.mock import patch
from tools.registry import registry
class _Response:
def __init__(self, payload: dict, status_code: int = 200):
self._payload = payload
self.status_code = status_code
self.text = json.dumps(payload)
def json(self):
return self._payload
def raise_for_status(self):
if self.status_code >= 400:
raise RuntimeError(f"HTTP {self.status_code}")
def _reload_module():
registry.deregister("ragflow_ingest")
registry.deregister("ragflow_query")
sys.modules.pop("tools.ragflow_tool", None)
module = importlib.import_module("tools.ragflow_tool")
return importlib.reload(module)
def test_ragflow_tools_register_and_support_document_formats():
module = _reload_module()
assert registry.get_entry("ragflow_ingest") is not None
assert registry.get_entry("ragflow_query") is not None
assert ".pdf" in module.SUPPORTED_EXTENSIONS
assert ".docx" in module.SUPPORTED_EXTENSIONS
assert ".png" in module.SUPPORTED_EXTENSIONS
assert ".md" in module.SUPPORTED_EXTENSIONS
def test_ragflow_ingest_creates_dataset_uploads_and_starts_parse(tmp_path):
module = _reload_module()
document = tmp_path / "paper.pdf"
document.write_bytes(b"%PDF-1.7\n")
calls: list[tuple[str, str, dict | None, dict | None]] = []
def fake_request(method, url, *, headers=None, params=None, json=None, files=None, timeout=None):
calls.append((method, url, params, json))
if method == "GET" and url.endswith("/api/v1/datasets"):
return _Response({"code": 0, "data": []})
if method == "POST" and url.endswith("/api/v1/datasets"):
assert json["name"] == "research-papers"
assert json["chunk_method"] == "paper"
return _Response({"code": 0, "data": {"id": "ds-1", "name": "research-papers"}})
if method == "POST" and url.endswith("/api/v1/datasets/ds-1/documents"):
assert files and files[0][0] == "file"
return _Response({"code": 0, "data": [{"id": "doc-1", "name": "paper.pdf"}]})
if method == "POST" and url.endswith("/api/v1/datasets/ds-1/chunks"):
assert json == {"document_ids": ["doc-1"]}
return _Response({"code": 0})
raise AssertionError(f"Unexpected request: {method} {url}")
with patch("tools.ragflow_tool.requests.request", side_effect=fake_request):
result = json.loads(module.ragflow_ingest_tool(str(document), dataset="research-papers"))
assert result["dataset_id"] == "ds-1"
assert result["document_ids"] == ["doc-1"]
assert result["parse_started"] is True
assert result["chunk_method"] == "paper"
assert calls[0][0] == "GET"
def test_ragflow_query_retrieves_chunks_for_named_dataset():
module = _reload_module()
def fake_request(method, url, *, headers=None, params=None, json=None, files=None, timeout=None):
if method == "GET" and url.endswith("/api/v1/datasets"):
assert params == {"name": "tech-docs"}
return _Response({"code": 0, "data": [{"id": "ds-9", "name": "tech-docs"}]})
if method == "POST" and url.endswith("/api/v1/retrieval"):
assert json["question"] == "How does parsing work?"
assert json["dataset_ids"] == ["ds-9"]
assert json["page_size"] == 2
return _Response(
{
"code": 0,
"data": {
"chunks": [
{
"content": "Parsing starts by uploading documents.",
"document_id": "doc-9",
"document_keyword": "guide.md",
"similarity": 0.98,
}
],
"total": 1,
},
}
)
raise AssertionError(f"Unexpected request: {method} {url}")
with patch("tools.ragflow_tool.requests.request", side_effect=fake_request):
result = json.loads(module.ragflow_query_tool("How does parsing work?", "tech-docs", limit=2))
assert result["dataset_id"] == "ds-9"
assert result["total"] == 1
assert result["chunks"][0]["content"] == "Parsing starts by uploading documents."
def test_ragflow_ingest_rejects_unsupported_document_types(tmp_path):
module = _reload_module()
document = tmp_path / "binary.exe"
document.write_bytes(b"MZ")
result = json.loads(module.ragflow_ingest_tool(str(document), dataset="ignored"))
assert "error" in result
assert "Unsupported document type" in result["error"]

View File

@@ -215,6 +215,8 @@ def _format_job(job: Dict[str, Any]) -> Dict[str, Any]:
}
if job.get("script"):
result["script"] = job["script"]
if job.get("profile"):
result["profile"] = job["profile"]
return result
@@ -234,6 +236,7 @@ def cronjob(
base_url: Optional[str] = None,
reason: Optional[str] = None,
script: Optional[str] = None,
profile: Optional[str] = None,
task_id: str = None,
) -> str:
"""Unified cron job management tool."""
@@ -271,6 +274,7 @@ def cronjob(
provider=_normalize_optional_job_value(provider),
base_url=_normalize_optional_job_value(base_url, strip_trailing_slash=True),
script=_normalize_optional_job_value(script),
profile=_normalize_optional_job_value(profile),
)
return json.dumps(
{
@@ -366,6 +370,8 @@ def cronjob(
repeat_state = dict(job.get("repeat") or {})
repeat_state["times"] = normalized_repeat
updates["repeat"] = repeat_state
if profile is not None:
updates["profile"] = _normalize_optional_job_value(profile)
if schedule is not None:
parsed_schedule = parse_schedule(schedule)
updates["schedule"] = parsed_schedule

View File

@@ -1,344 +0,0 @@
#!/usr/bin/env python3
"""RAGFlow tool integration for document understanding.
Provides two tools:
- ragflow_ingest(document_url, dataset): upload and parse a document into RAGFlow
- ragflow_query(query, dataset): retrieve relevant chunks from a dataset
Default deployment target is a local RAGFlow server on http://localhost:9380.
"""
from __future__ import annotations
import json
import mimetypes
import os
import tempfile
from pathlib import Path
from typing import Any
from urllib.parse import urlparse
import requests
from tools.registry import registry, tool_error, tool_result
RAGFLOW_INGEST_SCHEMA = {
"name": "ragflow_ingest",
"description": (
"Upload a document into a RAGFlow dataset, creating the dataset if needed, "
"then trigger parsing so Hermes can query the content later. Supports PDF, "
"Word, images via OCR, plus text and code documents."
),
"parameters": {
"type": "object",
"properties": {
"document_url": {
"type": "string",
"description": "HTTP(S) URL, file:// URL, or local filesystem path to the document.",
},
"dataset": {
"type": "string",
"description": "Dataset name or id to ingest into. Created automatically when absent.",
},
},
"required": ["document_url", "dataset"],
},
}
RAGFLOW_QUERY_SCHEMA = {
"name": "ragflow_query",
"description": (
"Query a RAGFlow dataset for relevant chunks. Useful for research papers, "
"technical docs, OCR-processed images, and ingested codebase documents."
),
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Question or search query to run against RAGFlow.",
},
"dataset": {
"type": "string",
"description": "Dataset name or id to search.",
},
"limit": {
"type": "integer",
"description": "Maximum number of chunks to return.",
"default": 5,
"minimum": 1,
"maximum": 25,
},
},
"required": ["query", "dataset"],
},
}
SUPPORTED_EXTENSIONS = {
".pdf": "paper",
".doc": "paper",
".docx": "paper",
".ppt": "presentation",
".pptx": "presentation",
".png": "picture",
".jpg": "picture",
".jpeg": "picture",
".webp": "picture",
".bmp": "picture",
".tif": "picture",
".tiff": "picture",
".gif": "picture",
".txt": "naive",
".md": "naive",
".rst": "naive",
".html": "naive",
".htm": "naive",
".csv": "table",
".tsv": "table",
".json": "naive",
".yaml": "naive",
".yml": "naive",
".toml": "naive",
".ini": "naive",
".py": "naive",
".js": "naive",
".ts": "naive",
".tsx": "naive",
".jsx": "naive",
".java": "naive",
".go": "naive",
".rs": "naive",
".c": "naive",
".cc": "naive",
".cpp": "naive",
".h": "naive",
".hpp": "naive",
".rb": "naive",
".php": "naive",
".sql": "naive",
".sh": "naive",
}
def _ragflow_base_url() -> str:
return os.getenv("RAGFLOW_API_URL", "http://localhost:9380").rstrip("/")
def _ragflow_headers(json_body: bool = True) -> dict[str, str]:
headers: dict[str, str] = {}
api_key = os.getenv("RAGFLOW_API_KEY", "").strip()
if api_key:
headers["Authorization"] = f"Bearer {api_key}"
if json_body:
headers["Content-Type"] = "application/json"
return headers
def _ragflow_check_requirements() -> bool:
return True
def _request_json(method: str, path: str, *, params=None, json_payload=None, files=None) -> dict[str, Any]:
response = requests.request(
method,
f"{_ragflow_base_url()}{path}",
headers=_ragflow_headers(json_body=files is None),
params=params,
json=json_payload,
files=files,
timeout=120,
)
response.raise_for_status()
payload = response.json()
if payload.get("code", 0) != 0:
message = payload.get("message") or payload.get("error") or "RAGFlow request failed"
raise RuntimeError(message)
return payload
def _is_probable_dataset_id(dataset: str) -> bool:
compact = dataset.replace("-", "")
return len(compact) >= 16 and all(ch.isalnum() for ch in compact)
def _resolve_dataset(dataset: str) -> tuple[str, str] | None:
dataset = dataset.strip()
if not dataset:
return None
params = {"id": dataset} if _is_probable_dataset_id(dataset) else {"name": dataset}
payload = _request_json("GET", "/api/v1/datasets", params=params)
data = payload.get("data") or []
if not data:
return None
match = data[0]
return match["id"], match.get("name", dataset)
def _ensure_dataset(dataset: str, chunk_method: str) -> tuple[str, str]:
resolved = _resolve_dataset(dataset)
if resolved:
return resolved
payload = _request_json(
"POST",
"/api/v1/datasets",
json_payload={"name": dataset, "chunk_method": chunk_method},
)
data = payload.get("data") or {}
return data["id"], data.get("name", dataset)
def _prepare_document(document_url: str) -> tuple[Path, bool]:
parsed = urlparse(document_url)
if parsed.scheme in {"http", "https"}:
response = requests.get(document_url, timeout=120)
response.raise_for_status()
suffix = Path(parsed.path).suffix or ".bin"
tmp = tempfile.NamedTemporaryFile(delete=False, suffix=suffix)
tmp.write(response.content)
tmp.flush()
tmp.close()
return Path(tmp.name), True
if parsed.scheme == "file":
return Path(parsed.path), False
return Path(document_url).expanduser(), False
def _detect_chunk_method(path: Path) -> str:
extension = path.suffix.lower()
if extension not in SUPPORTED_EXTENSIONS:
supported = ", ".join(sorted(SUPPORTED_EXTENSIONS))
raise ValueError(f"Unsupported document type '{extension or path.name}'. Supported document types: {supported}")
return SUPPORTED_EXTENSIONS[extension]
def _upload_document(dataset_id: str, path: Path) -> list[str]:
mime = mimetypes.guess_type(path.name)[0] or "application/octet-stream"
with path.open("rb") as handle:
payload = _request_json(
"POST",
f"/api/v1/datasets/{dataset_id}/documents",
files=[("file", (path.name, handle, mime))],
)
documents = payload.get("data") or []
ids = [item["id"] for item in documents if item.get("id")]
if not ids:
raise RuntimeError("RAGFlow upload did not return any document ids")
return ids
def ragflow_ingest_tool(document_url: str, dataset: str) -> str:
local_path = None
should_cleanup = False
try:
local_path, should_cleanup = _prepare_document(document_url)
if not local_path.exists():
return tool_error(f"Document not found: {document_url}")
chunk_method = _detect_chunk_method(local_path)
dataset_id, dataset_name = _ensure_dataset(dataset, chunk_method)
document_ids = _upload_document(dataset_id, local_path)
_request_json(
"POST",
f"/api/v1/datasets/{dataset_id}/chunks",
json_payload={"document_ids": document_ids},
)
return tool_result(
success=True,
dataset_id=dataset_id,
dataset_name=dataset_name,
document_ids=document_ids,
parse_started=True,
chunk_method=chunk_method,
source=document_url,
filename=local_path.name,
)
except ValueError as exc:
return tool_error(str(exc))
except Exception as exc:
return tool_error(f"RAGFlow ingest failed: {exc}")
finally:
if should_cleanup and local_path is not None:
try:
local_path.unlink(missing_ok=True)
except Exception:
pass
def _normalize_chunks(chunks: list[dict[str, Any]]) -> list[dict[str, Any]]:
normalized = []
for chunk in chunks:
normalized.append(
{
"content": chunk.get("content", ""),
"document_id": chunk.get("document_id", ""),
"document_name": chunk.get("document_keyword", ""),
"similarity": chunk.get("similarity"),
"highlight": chunk.get("highlight", ""),
}
)
return normalized
def ragflow_query_tool(query: str, dataset: str, limit: int = 5) -> str:
try:
resolved = _resolve_dataset(dataset)
if not resolved:
return tool_error(f"RAGFlow dataset not found: {dataset}")
dataset_id, dataset_name = resolved
payload = _request_json(
"POST",
"/api/v1/retrieval",
json_payload={
"question": query,
"dataset_ids": [dataset_id],
"page_size": max(1, min(int(limit), 25)),
"highlight": True,
"keyword": True,
},
)
data = payload.get("data") or {}
chunks = data.get("chunks") or []
return tool_result(
success=True,
dataset_id=dataset_id,
dataset_name=dataset_name,
total=data.get("total", len(chunks)),
chunks=_normalize_chunks(chunks),
)
except Exception as exc:
return tool_error(f"RAGFlow query failed: {exc}")
def _handle_ragflow_ingest(args, **_kwargs):
return ragflow_ingest_tool(
document_url=args.get("document_url", ""),
dataset=args.get("dataset", ""),
)
def _handle_ragflow_query(args, **_kwargs):
return ragflow_query_tool(
query=args.get("query", ""),
dataset=args.get("dataset", ""),
limit=args.get("limit", 5),
)
registry.register(
name="ragflow_ingest",
toolset="web",
schema=RAGFLOW_INGEST_SCHEMA,
handler=_handle_ragflow_ingest,
check_fn=_ragflow_check_requirements,
requires_env=["RAGFLOW_API_URL", "RAGFLOW_API_KEY"],
emoji="📚",
)
registry.register(
name="ragflow_query",
toolset="web",
schema=RAGFLOW_QUERY_SCHEMA,
handler=_handle_ragflow_query,
check_fn=_ragflow_check_requirements,
requires_env=["RAGFLOW_API_URL", "RAGFLOW_API_KEY"],
emoji="🧠",
)