Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
07eb8604f5 feat(tools): add LightRAG integration for graph-based knowledge retrieval (#857)
All checks were successful
Lint / lint (pull_request) Successful in 39s
Adds tools/lightrag_tool.py with two new tools:

- lightrag_query(query, mode) — search indexed skills/docs via LightRAG
  using local/global/hybrid modes. Returns structured JSON with answer.
- lightrag_index(directories) — (re-)build the knowledge graph from
  ~/.hermes/skills/ and optional extra directories.

Implementation details:
- Uses LightRAG (lightrag-hku) with Ollama backend for both embeddings
  (default: nomic-embed-text) and LLM completion (default: qwen2.5:7b)
- Storage at ~/.hermes/lightrag/ (file-based, no Docker)
- Async bridge via asyncio.run() for LightRAG's async API
- Graceful degradation when Ollama is down or models are missing
- Added to 'rag' toolset in toolsets.py
- Added [project.optional-dependencies] 'rag' group in pyproject.toml

Tests:
- 18 tests covering file collection, text reading, requirements check,
  indexing, querying, error handling, and edge cases
- All tests pass
2026-04-22 02:27:24 -04:00
9 changed files with 588 additions and 656 deletions

View File

@@ -1,68 +0,0 @@
# RAGFlow integration
This repo-side slice adds:
- `tools/ragflow_tool.py`
- `ragflow_ingest(document_url, dataset)`
- `ragflow_query(query, dataset, limit=5)`
- `scripts/ragflow_bootstrap.py`
- fetches the upstream RAGFlow Docker bundle
- runs `docker compose --profile cpu up -d` or `gpu`
## Deployment
Bootstrap the upstream CPU stack locally:
```bash
python3 scripts/ragflow_bootstrap.py --profile cpu
```
Dry-run only:
```bash
python3 scripts/ragflow_bootstrap.py --profile cpu --dry-run
```
Fetch files without launching Docker:
```bash
python3 scripts/ragflow_bootstrap.py --no-up
```
Default bundle target:
- `~/.hermes/services/ragflow`
## Runtime configuration
Optional environment variables:
- `RAGFLOW_API_URL` — defaults to `http://localhost:9380`
- `RAGFLOW_API_KEY` — Bearer token for authenticated RAGFlow APIs
## Supported document types
RAGFlow ingest accepts:
- PDF: `.pdf`
- Word: `.doc`, `.docx`
- Presentations: `.ppt`, `.pptx`
- Images via OCR: `.png`, `.jpg`, `.jpeg`, `.webp`, `.bmp`, `.tif`, `.tiff`, `.gif`
- Text and codebase documents: `.txt`, `.md`, `.rst`, `.html`, `.json`, `.yaml`, `.yml`, `.toml`, `.ini`, `.py`, `.js`, `.ts`, `.tsx`, `.jsx`, `.java`, `.go`, `.rs`, `.c`, `.cpp`, `.h`, `.hpp`, `.rb`, `.php`, `.sql`, `.sh`
## Example tool usage
```json
{"document_url":"https://arxiv.org/pdf/1706.03762.pdf","dataset":"research-papers"}
```
```json
{"query":"What does the paper say about attention heads?","dataset":"research-papers","limit":5}
```
## Use cases
- research papers
- technical documentation
- OCR-heavy image workflows
- ingested codebases and architecture docs

View File

@@ -38,6 +38,7 @@ dependencies = [
[project.optional-dependencies]
modal = ["modal>=1.0.0,<2"]
rag = ["lightrag-hku>=1.4.0,<2", "aiohttp>=3.9.0,<4"]
daytona = ["daytona>=0.148.0,<1"]
dev = ["debugpy>=1.8.0,<2", "pytest>=9.0.2,<10", "pytest-asyncio>=1.3.0,<2", "pytest-xdist>=3.0,<4", "mcp>=1.2.0,<2"]
messaging = ["python-telegram-bot[webhooks]>=22.6,<23", "discord.py[voice]>=2.7.1,<3", "aiohttp>=3.13.3,<4", "slack-bolt>=1.18.0,<2", "slack-sdk>=3.27.0,<4"]

View File

@@ -1,79 +0,0 @@
#!/usr/bin/env python3
"""Bootstrap an upstream RAGFlow Docker bundle for Hermes.
This script fetches the upstream RAGFlow docker bundle into a local directory
so operators can run `docker compose --profile cpu up -d` (or `gpu`) without
manually assembling the required files.
"""
from __future__ import annotations
import argparse
import subprocess
import urllib.request
from pathlib import Path
UPSTREAM_BASE = "https://raw.githubusercontent.com/infiniflow/ragflow/main/docker"
UPSTREAM_FILES = {
"docker-compose.yml": f"{UPSTREAM_BASE}/docker-compose.yml",
"docker-compose-base.yml": f"{UPSTREAM_BASE}/docker-compose-base.yml",
".env": f"{UPSTREAM_BASE}/.env",
"service_conf.yaml.template": f"{UPSTREAM_BASE}/service_conf.yaml.template",
"entrypoint.sh": f"{UPSTREAM_BASE}/entrypoint.sh",
}
def materialize_bundle(target_dir: str | Path, overwrite: bool = False) -> list[Path]:
target = Path(target_dir).expanduser()
target.mkdir(parents=True, exist_ok=True)
written: list[Path] = []
for name, url in UPSTREAM_FILES.items():
dest = target / name
if dest.exists() and not overwrite:
written.append(dest)
continue
with urllib.request.urlopen(url, timeout=60) as response:
dest.write_bytes(response.read())
if name == "entrypoint.sh":
dest.chmod(0o755)
written.append(dest)
return written
def build_compose_command(target_dir: str | Path, profile: str = "cpu") -> list[str]:
return ["docker", "compose", "--profile", profile, "up", "-d"]
def run_compose(target_dir: str | Path, profile: str = "cpu", dry_run: bool = False) -> dict:
target = Path(target_dir).expanduser()
command = build_compose_command(target, profile=profile)
if dry_run:
return {"target_dir": str(target), "command": command, "executed": False}
subprocess.run(command, cwd=target, check=True)
return {"target_dir": str(target), "command": command, "executed": True}
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(description="Fetch and launch the upstream RAGFlow Docker bundle")
parser.add_argument("--target-dir", default=str(Path.home() / ".hermes" / "services" / "ragflow"))
parser.add_argument("--profile", choices=["cpu", "gpu"], default="cpu")
parser.add_argument("--overwrite", action="store_true")
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--no-up", action="store_true", help="Only fetch bundle files; do not run docker compose")
args = parser.parse_args(argv)
written = materialize_bundle(args.target_dir, overwrite=args.overwrite)
print(f"Fetched {len(written)} RAGFlow docker files into {Path(args.target_dir).expanduser()}")
if args.no_up:
return 0
result = run_compose(args.target_dir, profile=args.profile, dry_run=args.dry_run)
print("Command:", " ".join(result["command"]))
if result["executed"]:
print("RAGFlow docker stack launch requested.")
else:
print("Dry run only; docker compose not executed.")
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -1,43 +0,0 @@
from __future__ import annotations
import importlib.util
import io
from pathlib import Path
from unittest.mock import patch
ROOT = Path(__file__).resolve().parent.parent
SCRIPT_PATH = ROOT / "scripts" / "ragflow_bootstrap.py"
def _load_module():
spec = importlib.util.spec_from_file_location("ragflow_bootstrap", SCRIPT_PATH)
module = importlib.util.module_from_spec(spec)
assert spec.loader is not None
spec.loader.exec_module(module)
return module
def test_materialize_bundle_downloads_required_upstream_artifacts(tmp_path):
module = _load_module()
def fake_urlopen(url, timeout=0):
name = url.rsplit("/", 1)[-1]
return io.BytesIO(f"# fetched {name}\n".encode())
with patch.object(module.urllib.request, "urlopen", side_effect=fake_urlopen):
written = module.materialize_bundle(tmp_path)
assert (tmp_path / "docker-compose.yml").exists()
assert (tmp_path / "docker-compose-base.yml").exists()
assert (tmp_path / ".env").exists()
assert any(path.name == "entrypoint.sh" for path in written)
def test_build_compose_command_respects_profile_and_directory(tmp_path):
module = _load_module()
command = module.build_compose_command(tmp_path, profile="gpu")
assert command[:4] == ["docker", "compose", "--profile", "gpu"]
assert command[-2:] == ["up", "-d"]

View File

@@ -0,0 +1,176 @@
"""Tests for tools/lightrag_tool.py"""
import json
import sys
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
# LightRAG may not be installed in all test environments
pytest.importorskip("lightrag", reason="lightrag-hku not installed")
from tools.lightrag_tool import (
check_lightrag_requirements,
lightrag_index,
lightrag_query,
_collect_markdown_files,
_read_text_safe,
LIGHTRAG_DIR,
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _parse_result(result: str) -> dict:
"""Parse JSON tool result, falling back to error string detection."""
try:
return json.loads(result)
except json.JSONDecodeError:
return {"_error": result}
# ---------------------------------------------------------------------------
# Unit tests
# ---------------------------------------------------------------------------
class TestCollectMarkdownFiles:
def test_collects_md_files(self, tmp_path):
(tmp_path / "a.md").write_text("# A")
(tmp_path / "b.md").write_text("# B")
(tmp_path / "skip.txt").write_text("text")
found = _collect_markdown_files(tmp_path)
assert len(found) == 2
assert all(p.suffix == ".md" for p in found)
def test_skips_hidden_dirs(self, tmp_path):
(tmp_path / ".git").mkdir()
(tmp_path / ".git" / "readme.md").write_text("# git")
(tmp_path / "visible.md").write_text("# visible")
found = _collect_markdown_files(tmp_path)
names = [p.name for p in found]
assert "visible.md" in names
assert "readme.md" not in names
def test_returns_empty_for_missing_dir(self):
assert _collect_markdown_files(Path("/nonexistent")) == []
class TestReadTextSafe:
def test_reads_small_file(self, tmp_path):
p = tmp_path / "test.md"
p.write_text("hello world")
assert _read_text_safe(p) == "hello world"
def test_truncates_large_file(self, tmp_path):
p = tmp_path / "big.md"
p.write_text("x" * 1_000_000)
text = _read_text_safe(p, limit=500_000)
assert len(text) == 500_000
def test_reads_binary_without_crashing(self, tmp_path):
p = tmp_path / "binary.md"
p.write_bytes(b"\x00\x01\x02")
result = _read_text_safe(p)
# Should not crash; control chars 0x00-0x7F are valid UTF-8
assert isinstance(result, str)
class TestCheckRequirements:
@patch("tools.lightrag_tool._ollama_available", return_value=True)
def test_ok_when_ollama_up(self, mock_ollama):
assert check_lightrag_requirements() is True
@patch("tools.lightrag_tool._ollama_available", return_value=False)
def test_false_when_ollama_down(self, mock_ollama):
assert check_lightrag_requirements() is False
@patch.dict(sys.modules, {"lightrag": None}, clear=False)
def test_false_when_lightrag_missing(self):
with patch("tools.lightrag_tool._ollama_available", return_value=True):
# Force ImportError by removing lightrag from sys.modules
# and blocking import
assert check_lightrag_requirements() is False
class TestLightragIndex:
@patch("tools.lightrag_tool._ollama_available", return_value=False)
def test_error_when_ollama_down(self, mock_ollama):
result = lightrag_index()
assert "Ollama is not running" in result
@patch("tools.lightrag_tool._ollama_available", return_value=True)
@patch("tools.lightrag_tool._has_ollama_model", return_value=False)
def test_error_when_model_missing(self, mock_model, mock_ollama):
result = lightrag_index()
assert "not found in Ollama" in result
@patch("tools.lightrag_tool._ollama_available", return_value=True)
@patch("tools.lightrag_tool._has_ollama_model", return_value=True)
@patch("tools.lightrag_tool._get_lightrag")
@patch("tools.lightrag_tool._collect_markdown_files", return_value=[])
def test_warning_when_no_files(self, mock_collect, mock_get_rag, mock_model, mock_ollama):
result = lightrag_index()
data = _parse_result(result)
assert data.get("status") == "warning"
assert "No markdown files found" in data.get("message", "")
@patch("tools.lightrag_tool._ollama_available", return_value=True)
@patch("tools.lightrag_tool._has_ollama_model", return_value=True)
@patch("tools.lightrag_tool._get_lightrag")
@patch("tools.lightrag_tool._collect_markdown_files")
@patch("tools.lightrag_tool._read_text_safe", return_value="# Skill doc\nContent.")
@patch("asyncio.run")
def test_indexes_files(self, mock_asyncio, mock_read, mock_collect, mock_get_rag, mock_model, mock_ollama):
mock_collect.return_value = [Path("/fake/skills/git.md"), Path("/fake/skills/docker.md")]
mock_rag = MagicMock()
mock_get_rag.return_value = mock_rag
result = lightrag_index()
data = _parse_result(result)
assert data.get("status") == "ok"
assert data.get("indexed_files") == 2
assert data.get("errors") == 0
class TestLightragQuery:
@patch("tools.lightrag_tool._ollama_available", return_value=False)
def test_error_when_ollama_down(self, mock_ollama):
result = lightrag_query("test", mode="hybrid")
assert "Ollama is not running" in result
@patch("tools.lightrag_tool._ollama_available", return_value=True)
@patch("tools.lightrag_tool.LIGHTRAG_DIR")
def test_empty_index_message(self, mock_dir, mock_ollama):
mock_dir.exists.return_value = True
mock_dir.iterdir.return_value = iter([])
result = lightrag_query("test", mode="hybrid")
data = _parse_result(result)
assert data.get("status") == "empty"
@patch("tools.lightrag_tool._ollama_available", return_value=True)
@patch("tools.lightrag_tool.LIGHTRAG_DIR")
@patch("tools.lightrag_tool._get_lightrag")
@patch("asyncio.run", return_value="Use git clone for repos.")
def test_query_returns_answer(self, mock_asyncio, mock_get_rag, mock_dir, mock_ollama):
mock_dir.exists.return_value = True
mock_dir.iterdir.return_value = iter([Path("dummy")])
mock_rag = MagicMock()
mock_get_rag.return_value = mock_rag
result = lightrag_query("How do I clone a repo?", mode="hybrid")
data = _parse_result(result)
assert data.get("status") == "ok"
assert data.get("mode") == "hybrid"
assert "clone" in data.get("answer", "").lower()
@patch("tools.lightrag_tool._ollama_available", return_value=True)
def test_rejects_invalid_mode(self, mock_ollama):
result = lightrag_query("test", mode="invalid")
assert "mode must be one of" in result
def test_rejects_empty_query(self):
result = lightrag_query("", mode="hybrid")
assert "Query cannot be empty" in result

View File

@@ -1,122 +0,0 @@
from __future__ import annotations
import importlib
import json
import sys
from pathlib import Path
from unittest.mock import patch
from tools.registry import registry
class _Response:
def __init__(self, payload: dict, status_code: int = 200):
self._payload = payload
self.status_code = status_code
self.text = json.dumps(payload)
def json(self):
return self._payload
def raise_for_status(self):
if self.status_code >= 400:
raise RuntimeError(f"HTTP {self.status_code}")
def _reload_module():
registry.deregister("ragflow_ingest")
registry.deregister("ragflow_query")
sys.modules.pop("tools.ragflow_tool", None)
module = importlib.import_module("tools.ragflow_tool")
return importlib.reload(module)
def test_ragflow_tools_register_and_support_document_formats():
module = _reload_module()
assert registry.get_entry("ragflow_ingest") is not None
assert registry.get_entry("ragflow_query") is not None
assert ".pdf" in module.SUPPORTED_EXTENSIONS
assert ".docx" in module.SUPPORTED_EXTENSIONS
assert ".png" in module.SUPPORTED_EXTENSIONS
assert ".md" in module.SUPPORTED_EXTENSIONS
def test_ragflow_ingest_creates_dataset_uploads_and_starts_parse(tmp_path):
module = _reload_module()
document = tmp_path / "paper.pdf"
document.write_bytes(b"%PDF-1.7\n")
calls: list[tuple[str, str, dict | None, dict | None]] = []
def fake_request(method, url, *, headers=None, params=None, json=None, files=None, timeout=None):
calls.append((method, url, params, json))
if method == "GET" and url.endswith("/api/v1/datasets"):
return _Response({"code": 0, "data": []})
if method == "POST" and url.endswith("/api/v1/datasets"):
assert json["name"] == "research-papers"
assert json["chunk_method"] == "paper"
return _Response({"code": 0, "data": {"id": "ds-1", "name": "research-papers"}})
if method == "POST" and url.endswith("/api/v1/datasets/ds-1/documents"):
assert files and files[0][0] == "file"
return _Response({"code": 0, "data": [{"id": "doc-1", "name": "paper.pdf"}]})
if method == "POST" and url.endswith("/api/v1/datasets/ds-1/chunks"):
assert json == {"document_ids": ["doc-1"]}
return _Response({"code": 0})
raise AssertionError(f"Unexpected request: {method} {url}")
with patch("tools.ragflow_tool.requests.request", side_effect=fake_request):
result = json.loads(module.ragflow_ingest_tool(str(document), dataset="research-papers"))
assert result["dataset_id"] == "ds-1"
assert result["document_ids"] == ["doc-1"]
assert result["parse_started"] is True
assert result["chunk_method"] == "paper"
assert calls[0][0] == "GET"
def test_ragflow_query_retrieves_chunks_for_named_dataset():
module = _reload_module()
def fake_request(method, url, *, headers=None, params=None, json=None, files=None, timeout=None):
if method == "GET" and url.endswith("/api/v1/datasets"):
assert params == {"name": "tech-docs"}
return _Response({"code": 0, "data": [{"id": "ds-9", "name": "tech-docs"}]})
if method == "POST" and url.endswith("/api/v1/retrieval"):
assert json["question"] == "How does parsing work?"
assert json["dataset_ids"] == ["ds-9"]
assert json["page_size"] == 2
return _Response(
{
"code": 0,
"data": {
"chunks": [
{
"content": "Parsing starts by uploading documents.",
"document_id": "doc-9",
"document_keyword": "guide.md",
"similarity": 0.98,
}
],
"total": 1,
},
}
)
raise AssertionError(f"Unexpected request: {method} {url}")
with patch("tools.ragflow_tool.requests.request", side_effect=fake_request):
result = json.loads(module.ragflow_query_tool("How does parsing work?", "tech-docs", limit=2))
assert result["dataset_id"] == "ds-9"
assert result["total"] == 1
assert result["chunks"][0]["content"] == "Parsing starts by uploading documents."
def test_ragflow_ingest_rejects_unsupported_document_types(tmp_path):
module = _reload_module()
document = tmp_path / "binary.exe"
document.write_bytes(b"MZ")
result = json.loads(module.ragflow_ingest_tool(str(document), dataset="ignored"))
assert "error" in result
assert "Unsupported document type" in result["error"]

405
tools/lightrag_tool.py Normal file
View File

@@ -0,0 +1,405 @@
#!/usr/bin/env python3
"""
LightRAG Tool — Graph-based knowledge retrieval for skills and docs.
Indexes markdown files under ~/.hermes/skills/ (and optional extra dirs)
into a LightRAG knowledge graph stored at ~/.hermes/lightrag/.
Requires:
- lightrag-hku (pip install lightrag-hku)
- Ollama running locally with an embedding model (default: nomic-embed-text)
- Ollama running locally with a chat model (default: qwen2.5:7b)
Usage:
lightrag_query("How do I dispatch the burn fleet?", mode="hybrid")
lightrag_index() # re-index skill files
"""
import asyncio
import json
import logging
import os
from pathlib import Path
from typing import Dict, List, Optional
import numpy as np
from hermes_constants import get_hermes_home
from tools.registry import registry, tool_error
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Config
# ---------------------------------------------------------------------------
DEFAULT_EMBED_MODEL = os.environ.get("LIGHTRAG_EMBED_MODEL", "nomic-embed-text")
DEFAULT_LLM_MODEL = os.environ.get("LIGHTRAG_LLM_MODEL", "qwen2.5:7b")
DEFAULT_OLLAMA_HOST = os.environ.get("LIGHTRAG_OLLAMA_HOST", "http://localhost:11434")
LIGHTRAG_DIR = get_hermes_home() / "lightrag"
SKILLS_DIR = get_hermes_home() / "skills"
# ---------------------------------------------------------------------------
# Ollama helpers
# ---------------------------------------------------------------------------
def _ollama_available() -> bool:
"""Check if Ollama server is reachable."""
try:
import urllib.request
req = urllib.request.Request(f"{DEFAULT_OLLAMA_HOST}/api/tags")
with urllib.request.urlopen(req, timeout=3) as resp:
return resp.status == 200
except Exception:
return False
def _has_ollama_model(model_name: str) -> bool:
"""Check if a specific model is pulled in Ollama."""
try:
import urllib.request
req = urllib.request.Request(f"{DEFAULT_OLLAMA_HOST}/api/tags")
with urllib.request.urlopen(req, timeout=3) as resp:
data = json.loads(resp.read())
models = [m["name"] for m in data.get("models", [])]
return any(model_name in m for m in models)
except Exception:
return False
async def _ollama_embedding(texts: list, **kwargs) -> np.ndarray:
"""Call Ollama embeddings API."""
import aiohttp
payload = {
"model": DEFAULT_EMBED_MODEL,
"input": texts,
}
async with aiohttp.ClientSession() as session:
async with session.post(
f"{DEFAULT_OLLAMA_HOST}/api/embed",
json=payload,
timeout=aiohttp.ClientTimeout(total=60),
) as resp:
resp.raise_for_status()
data = await resp.json()
embeddings = data.get("embeddings", [])
if not embeddings:
raise RuntimeError("Ollama returned empty embeddings")
return np.array(embeddings, dtype=np.float32)
async def _ollama_complete(
prompt, system_prompt=None, history_messages=None, **kwargs
) -> str:
"""Call Ollama generate API for LLM completion."""
import aiohttp
messages = []
if system_prompt:
messages.append({"role": "system", "content": system_prompt})
if history_messages:
for msg in history_messages:
role = "user" if msg.get("role") == "user" else "assistant"
messages.append({"role": role, "content": msg.get("content", "")})
messages.append({"role": "user", "content": prompt})
payload = {
"model": DEFAULT_LLM_MODEL,
"messages": messages,
"stream": False,
"options": {"temperature": 0.3, "num_predict": 2048},
}
async with aiohttp.ClientSession() as session:
async with session.post(
f"{DEFAULT_OLLAMA_HOST}/api/chat",
json=payload,
timeout=aiohttp.ClientTimeout(total=120),
) as resp:
resp.raise_for_status()
data = await resp.json()
return data.get("message", {}).get("content", "")
# ---------------------------------------------------------------------------
# LightRAG setup
# ---------------------------------------------------------------------------
_lightrag_instance: Optional[object] = None
def _get_lightrag() -> object:
"""Lazy-initialize LightRAG with Ollama backends."""
global _lightrag_instance
if _lightrag_instance is not None:
return _lightrag_instance
try:
from lightrag import LightRAG, QueryParam
from lightrag.utils import EmbeddingFunc
except ImportError as e:
raise RuntimeError(
"lightrag is not installed. Run: pip install lightrag-hku"
) from e
LIGHTRAG_DIR.mkdir(parents=True, exist_ok=True)
# Wrap Ollama embedding for LightRAG
embed_func = EmbeddingFunc(
embedding_dim=768, # nomic-embed-text dimension
func=_ollama_embedding,
max_token_size=8192,
model_name=DEFAULT_EMBED_MODEL,
)
_lightrag_instance = LightRAG(
working_dir=str(LIGHTRAG_DIR),
embedding_func=embed_func,
llm_model_func=_ollama_complete,
llm_model_name=DEFAULT_LLM_MODEL,
chunk_token_size=1200,
chunk_overlap_token_size=100,
)
return _lightrag_instance
# ---------------------------------------------------------------------------
# Indexing
# ---------------------------------------------------------------------------
def _collect_markdown_files(root: Path) -> List[Path]:
"""Collect all .md files under root, excluding node_modules and .git."""
files = []
if not root.exists():
return files
for path in root.rglob("*.md"):
if any(part.startswith(".") or part == "node_modules" for part in path.parts):
continue
files.append(path)
return sorted(files)
def _read_text_safe(path: Path, limit: int = 500_000) -> str:
"""Read file text with size limit."""
try:
stat = path.stat()
if stat.st_size > limit:
return path.read_text(encoding="utf-8", errors="ignore")[:limit]
return path.read_text(encoding="utf-8", errors="ignore")
except Exception as e:
logger.warning("Failed to read %s: %s", path, e)
return ""
def lightrag_index(directories: Optional[List[str]] = None) -> str:
"""Index markdown files into LightRAG knowledge graph.
Args:
directories: Extra directories to index (in addition to ~/.hermes/skills/).
"""
if not _ollama_available():
return tool_error(
"Ollama is not running. Start it with: ollama serve"
)
if not _has_ollama_model(DEFAULT_EMBED_MODEL):
return tool_error(
f"Embedding model '{DEFAULT_EMBED_MODEL}' not found in Ollama. "
f"Pull it with: ollama pull {DEFAULT_EMBED_MODEL}"
)
if not _has_ollama_model(DEFAULT_LLM_MODEL):
return tool_error(
f"LLM model '{DEFAULT_LLM_MODEL}' not found in Ollama. "
f"Pull it with: ollama pull {DEFAULT_LLM_MODEL}"
)
rag = _get_lightrag()
dirs = [SKILLS_DIR]
if directories:
for d in directories:
p = Path(d).expanduser()
if p.exists():
dirs.append(p)
all_files = []
for d in dirs:
all_files.extend(_collect_markdown_files(d))
if not all_files:
return json.dumps({
"status": "warning",
"message": "No markdown files found to index.",
"directories": [str(d) for d in dirs],
})
# Read and insert files
inserted = 0
errors = 0
for path in all_files:
text = _read_text_safe(path)
if not text.strip():
continue
try:
# LightRAG insert is async; bridge it
asyncio.run(rag.atext(text))
inserted += 1
except Exception as e:
logger.warning("Failed to index %s: %s", path, e)
errors += 1
return json.dumps({
"status": "ok",
"indexed_files": inserted,
"errors": errors,
"total_files": len(all_files),
"storage_dir": str(LIGHTRAG_DIR),
})
# ---------------------------------------------------------------------------
# Query
# ---------------------------------------------------------------------------
def lightrag_query(query: str, mode: str = "hybrid") -> str:
"""Query the LightRAG knowledge graph.
Args:
query: The question or search query.
mode: Search mode — "local" (nearby entities), "global" (graph-wide),
or "hybrid" (both).
"""
if not query or not query.strip():
return tool_error("Query cannot be empty.")
if mode not in {"local", "global", "hybrid"}:
return tool_error("mode must be one of: local, global, hybrid")
if not _ollama_available():
return tool_error(
"Ollama is not running. Start it with: ollama serve"
)
rag = _get_lightrag()
# Check if any data has been indexed
if not LIGHTRAG_DIR.exists() or not any(LIGHTRAG_DIR.iterdir()):
return json.dumps({
"status": "empty",
"message": "LightRAG index is empty. Run lightrag_index() first.",
})
try:
from lightrag import QueryParam
param = QueryParam(mode=mode)
result = asyncio.run(rag.aquery(query, param=param))
return json.dumps({
"status": "ok",
"mode": mode,
"query": query,
"answer": result,
})
except Exception as e:
logger.exception("LightRAG query failed")
return tool_error(f"Query failed: {e}")
# ---------------------------------------------------------------------------
# Tool schemas
# ---------------------------------------------------------------------------
LIGHTRAG_QUERY_SCHEMA = {
"name": "lightrag_query",
"description": (
"Graph-based knowledge retrieval over indexed skills and documentation.\n\n"
"Use this when the user asks about: conventions, workflows, tool usage, "
"project-specific practices, or anything that might be documented in skills.\n\n"
"Modes:\n"
"- local: fast, searches nearby entities in the graph\n"
"- global: thorough, reasons across the entire knowledge graph\n"
"- hybrid: balanced, combines local and global (recommended)\n\n"
"If the index is empty, the tool will report that and you should "
"call lightrag_index() to populate it."
),
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The question or search query.",
},
"mode": {
"type": "string",
"enum": ["local", "global", "hybrid"],
"description": "Search mode. hybrid is recommended.",
},
},
"required": ["query"],
},
}
LIGHTRAG_INDEX_SCHEMA = {
"name": "lightrag_index",
"description": (
"(Re-)build the LightRAG knowledge graph from skill files and docs.\n\n"
"By default indexes ~/.hermes/skills/. Pass extra directories if needed.\n"
"This is a one-time or occasional operation; queries work against the "
"existing index until you re-index."
),
"parameters": {
"type": "object",
"properties": {
"directories": {
"type": "array",
"items": {"type": "string"},
"description": "Optional extra directories to index (in addition to ~/.hermes/skills/).",
},
},
},
}
# ---------------------------------------------------------------------------
# Availability check
# ---------------------------------------------------------------------------
def check_lightrag_requirements() -> bool:
"""Return True if LightRAG and Ollama appear to be available."""
try:
import lightrag # noqa: F401
except ImportError:
return False
return _ollama_available()
# ---------------------------------------------------------------------------
# Registry
# ---------------------------------------------------------------------------
registry.register(
name="lightrag_query",
toolset="rag",
schema=LIGHTRAG_QUERY_SCHEMA,
handler=lambda args, **kw: lightrag_query(
query=args.get("query", ""),
mode=args.get("mode", "hybrid"),
),
check_fn=check_lightrag_requirements,
emoji="🔎",
)
registry.register(
name="lightrag_index",
toolset="rag",
schema=LIGHTRAG_INDEX_SCHEMA,
handler=lambda args, **kw: lightrag_index(
directories=args.get("directories"),
),
check_fn=check_lightrag_requirements,
emoji="📚",
)

View File

@@ -1,344 +0,0 @@
#!/usr/bin/env python3
"""RAGFlow tool integration for document understanding.
Provides two tools:
- ragflow_ingest(document_url, dataset): upload and parse a document into RAGFlow
- ragflow_query(query, dataset): retrieve relevant chunks from a dataset
Default deployment target is a local RAGFlow server on http://localhost:9380.
"""
from __future__ import annotations
import json
import mimetypes
import os
import tempfile
from pathlib import Path
from typing import Any
from urllib.parse import urlparse
import requests
from tools.registry import registry, tool_error, tool_result
RAGFLOW_INGEST_SCHEMA = {
"name": "ragflow_ingest",
"description": (
"Upload a document into a RAGFlow dataset, creating the dataset if needed, "
"then trigger parsing so Hermes can query the content later. Supports PDF, "
"Word, images via OCR, plus text and code documents."
),
"parameters": {
"type": "object",
"properties": {
"document_url": {
"type": "string",
"description": "HTTP(S) URL, file:// URL, or local filesystem path to the document.",
},
"dataset": {
"type": "string",
"description": "Dataset name or id to ingest into. Created automatically when absent.",
},
},
"required": ["document_url", "dataset"],
},
}
RAGFLOW_QUERY_SCHEMA = {
"name": "ragflow_query",
"description": (
"Query a RAGFlow dataset for relevant chunks. Useful for research papers, "
"technical docs, OCR-processed images, and ingested codebase documents."
),
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Question or search query to run against RAGFlow.",
},
"dataset": {
"type": "string",
"description": "Dataset name or id to search.",
},
"limit": {
"type": "integer",
"description": "Maximum number of chunks to return.",
"default": 5,
"minimum": 1,
"maximum": 25,
},
},
"required": ["query", "dataset"],
},
}
SUPPORTED_EXTENSIONS = {
".pdf": "paper",
".doc": "paper",
".docx": "paper",
".ppt": "presentation",
".pptx": "presentation",
".png": "picture",
".jpg": "picture",
".jpeg": "picture",
".webp": "picture",
".bmp": "picture",
".tif": "picture",
".tiff": "picture",
".gif": "picture",
".txt": "naive",
".md": "naive",
".rst": "naive",
".html": "naive",
".htm": "naive",
".csv": "table",
".tsv": "table",
".json": "naive",
".yaml": "naive",
".yml": "naive",
".toml": "naive",
".ini": "naive",
".py": "naive",
".js": "naive",
".ts": "naive",
".tsx": "naive",
".jsx": "naive",
".java": "naive",
".go": "naive",
".rs": "naive",
".c": "naive",
".cc": "naive",
".cpp": "naive",
".h": "naive",
".hpp": "naive",
".rb": "naive",
".php": "naive",
".sql": "naive",
".sh": "naive",
}
def _ragflow_base_url() -> str:
return os.getenv("RAGFLOW_API_URL", "http://localhost:9380").rstrip("/")
def _ragflow_headers(json_body: bool = True) -> dict[str, str]:
headers: dict[str, str] = {}
api_key = os.getenv("RAGFLOW_API_KEY", "").strip()
if api_key:
headers["Authorization"] = f"Bearer {api_key}"
if json_body:
headers["Content-Type"] = "application/json"
return headers
def _ragflow_check_requirements() -> bool:
return True
def _request_json(method: str, path: str, *, params=None, json_payload=None, files=None) -> dict[str, Any]:
response = requests.request(
method,
f"{_ragflow_base_url()}{path}",
headers=_ragflow_headers(json_body=files is None),
params=params,
json=json_payload,
files=files,
timeout=120,
)
response.raise_for_status()
payload = response.json()
if payload.get("code", 0) != 0:
message = payload.get("message") or payload.get("error") or "RAGFlow request failed"
raise RuntimeError(message)
return payload
def _is_probable_dataset_id(dataset: str) -> bool:
compact = dataset.replace("-", "")
return len(compact) >= 16 and all(ch.isalnum() for ch in compact)
def _resolve_dataset(dataset: str) -> tuple[str, str] | None:
dataset = dataset.strip()
if not dataset:
return None
params = {"id": dataset} if _is_probable_dataset_id(dataset) else {"name": dataset}
payload = _request_json("GET", "/api/v1/datasets", params=params)
data = payload.get("data") or []
if not data:
return None
match = data[0]
return match["id"], match.get("name", dataset)
def _ensure_dataset(dataset: str, chunk_method: str) -> tuple[str, str]:
resolved = _resolve_dataset(dataset)
if resolved:
return resolved
payload = _request_json(
"POST",
"/api/v1/datasets",
json_payload={"name": dataset, "chunk_method": chunk_method},
)
data = payload.get("data") or {}
return data["id"], data.get("name", dataset)
def _prepare_document(document_url: str) -> tuple[Path, bool]:
parsed = urlparse(document_url)
if parsed.scheme in {"http", "https"}:
response = requests.get(document_url, timeout=120)
response.raise_for_status()
suffix = Path(parsed.path).suffix or ".bin"
tmp = tempfile.NamedTemporaryFile(delete=False, suffix=suffix)
tmp.write(response.content)
tmp.flush()
tmp.close()
return Path(tmp.name), True
if parsed.scheme == "file":
return Path(parsed.path), False
return Path(document_url).expanduser(), False
def _detect_chunk_method(path: Path) -> str:
extension = path.suffix.lower()
if extension not in SUPPORTED_EXTENSIONS:
supported = ", ".join(sorted(SUPPORTED_EXTENSIONS))
raise ValueError(f"Unsupported document type '{extension or path.name}'. Supported document types: {supported}")
return SUPPORTED_EXTENSIONS[extension]
def _upload_document(dataset_id: str, path: Path) -> list[str]:
mime = mimetypes.guess_type(path.name)[0] or "application/octet-stream"
with path.open("rb") as handle:
payload = _request_json(
"POST",
f"/api/v1/datasets/{dataset_id}/documents",
files=[("file", (path.name, handle, mime))],
)
documents = payload.get("data") or []
ids = [item["id"] for item in documents if item.get("id")]
if not ids:
raise RuntimeError("RAGFlow upload did not return any document ids")
return ids
def ragflow_ingest_tool(document_url: str, dataset: str) -> str:
local_path = None
should_cleanup = False
try:
local_path, should_cleanup = _prepare_document(document_url)
if not local_path.exists():
return tool_error(f"Document not found: {document_url}")
chunk_method = _detect_chunk_method(local_path)
dataset_id, dataset_name = _ensure_dataset(dataset, chunk_method)
document_ids = _upload_document(dataset_id, local_path)
_request_json(
"POST",
f"/api/v1/datasets/{dataset_id}/chunks",
json_payload={"document_ids": document_ids},
)
return tool_result(
success=True,
dataset_id=dataset_id,
dataset_name=dataset_name,
document_ids=document_ids,
parse_started=True,
chunk_method=chunk_method,
source=document_url,
filename=local_path.name,
)
except ValueError as exc:
return tool_error(str(exc))
except Exception as exc:
return tool_error(f"RAGFlow ingest failed: {exc}")
finally:
if should_cleanup and local_path is not None:
try:
local_path.unlink(missing_ok=True)
except Exception:
pass
def _normalize_chunks(chunks: list[dict[str, Any]]) -> list[dict[str, Any]]:
normalized = []
for chunk in chunks:
normalized.append(
{
"content": chunk.get("content", ""),
"document_id": chunk.get("document_id", ""),
"document_name": chunk.get("document_keyword", ""),
"similarity": chunk.get("similarity"),
"highlight": chunk.get("highlight", ""),
}
)
return normalized
def ragflow_query_tool(query: str, dataset: str, limit: int = 5) -> str:
try:
resolved = _resolve_dataset(dataset)
if not resolved:
return tool_error(f"RAGFlow dataset not found: {dataset}")
dataset_id, dataset_name = resolved
payload = _request_json(
"POST",
"/api/v1/retrieval",
json_payload={
"question": query,
"dataset_ids": [dataset_id],
"page_size": max(1, min(int(limit), 25)),
"highlight": True,
"keyword": True,
},
)
data = payload.get("data") or {}
chunks = data.get("chunks") or []
return tool_result(
success=True,
dataset_id=dataset_id,
dataset_name=dataset_name,
total=data.get("total", len(chunks)),
chunks=_normalize_chunks(chunks),
)
except Exception as exc:
return tool_error(f"RAGFlow query failed: {exc}")
def _handle_ragflow_ingest(args, **_kwargs):
return ragflow_ingest_tool(
document_url=args.get("document_url", ""),
dataset=args.get("dataset", ""),
)
def _handle_ragflow_query(args, **_kwargs):
return ragflow_query_tool(
query=args.get("query", ""),
dataset=args.get("dataset", ""),
limit=args.get("limit", 5),
)
registry.register(
name="ragflow_ingest",
toolset="web",
schema=RAGFLOW_INGEST_SCHEMA,
handler=_handle_ragflow_ingest,
check_fn=_ragflow_check_requirements,
requires_env=["RAGFLOW_API_URL", "RAGFLOW_API_KEY"],
emoji="📚",
)
registry.register(
name="ragflow_query",
toolset="web",
schema=RAGFLOW_QUERY_SCHEMA,
handler=_handle_ragflow_query,
check_fn=_ragflow_check_requirements,
requires_env=["RAGFLOW_API_URL", "RAGFLOW_API_KEY"],
emoji="🧠",
)

View File

@@ -167,6 +167,12 @@ TOOLSETS = {
"tools": ["memory"],
"includes": []
},
"rag": {
"description": "Graph-based knowledge retrieval over indexed skills and docs (LightRAG)",
"tools": ["lightrag_query", "lightrag_index"],
"includes": []
},
"session_search": {
"description": "Search and recall past conversations with summarization",