Compare commits

..

2 Commits

Author SHA1 Message Date
Alexander Whitestone
eb16a6671e feat: add ragflow integration for #859
All checks were successful
Lint / lint (pull_request) Successful in 9s
2026-04-22 02:31:01 -04:00
Alexander Whitestone
18998b60c3 test: define ragflow integration for #859 2026-04-22 02:26:43 -04:00
10 changed files with 664 additions and 175 deletions

View File

@@ -0,0 +1,68 @@
# RAGFlow integration
This repo-side slice adds:
- `tools/ragflow_tool.py`
- `ragflow_ingest(document_url, dataset)`
- `ragflow_query(query, dataset, limit=5)`
- `scripts/ragflow_bootstrap.py`
- fetches the upstream RAGFlow Docker bundle
- runs `docker compose --profile cpu up -d` or `gpu`
## Deployment
Bootstrap the upstream CPU stack locally:
```bash
python3 scripts/ragflow_bootstrap.py --profile cpu
```
Dry-run only:
```bash
python3 scripts/ragflow_bootstrap.py --profile cpu --dry-run
```
Fetch files without launching Docker:
```bash
python3 scripts/ragflow_bootstrap.py --no-up
```
Default bundle target:
- `~/.hermes/services/ragflow`
## Runtime configuration
Optional environment variables:
- `RAGFLOW_API_URL` — defaults to `http://localhost:9380`
- `RAGFLOW_API_KEY` — Bearer token for authenticated RAGFlow APIs
## Supported document types
RAGFlow ingest accepts:
- PDF: `.pdf`
- Word: `.doc`, `.docx`
- Presentations: `.ppt`, `.pptx`
- Images via OCR: `.png`, `.jpg`, `.jpeg`, `.webp`, `.bmp`, `.tif`, `.tiff`, `.gif`
- Text and codebase documents: `.txt`, `.md`, `.rst`, `.html`, `.json`, `.yaml`, `.yml`, `.toml`, `.ini`, `.py`, `.js`, `.ts`, `.tsx`, `.jsx`, `.java`, `.go`, `.rs`, `.c`, `.cpp`, `.h`, `.hpp`, `.rb`, `.php`, `.sql`, `.sh`
## Example tool usage
```json
{"document_url":"https://arxiv.org/pdf/1706.03762.pdf","dataset":"research-papers"}
```
```json
{"query":"What does the paper say about attention heads?","dataset":"research-papers","limit":5}
```
## Use cases
- research papers
- technical documentation
- OCR-heavy image workflows
- ingested codebases and architecture docs

View File

@@ -0,0 +1,79 @@
#!/usr/bin/env python3
"""Bootstrap an upstream RAGFlow Docker bundle for Hermes.
This script fetches the upstream RAGFlow docker bundle into a local directory
so operators can run `docker compose --profile cpu up -d` (or `gpu`) without
manually assembling the required files.
"""
from __future__ import annotations
import argparse
import subprocess
import urllib.request
from pathlib import Path
UPSTREAM_BASE = "https://raw.githubusercontent.com/infiniflow/ragflow/main/docker"
UPSTREAM_FILES = {
"docker-compose.yml": f"{UPSTREAM_BASE}/docker-compose.yml",
"docker-compose-base.yml": f"{UPSTREAM_BASE}/docker-compose-base.yml",
".env": f"{UPSTREAM_BASE}/.env",
"service_conf.yaml.template": f"{UPSTREAM_BASE}/service_conf.yaml.template",
"entrypoint.sh": f"{UPSTREAM_BASE}/entrypoint.sh",
}
def materialize_bundle(target_dir: str | Path, overwrite: bool = False) -> list[Path]:
target = Path(target_dir).expanduser()
target.mkdir(parents=True, exist_ok=True)
written: list[Path] = []
for name, url in UPSTREAM_FILES.items():
dest = target / name
if dest.exists() and not overwrite:
written.append(dest)
continue
with urllib.request.urlopen(url, timeout=60) as response:
dest.write_bytes(response.read())
if name == "entrypoint.sh":
dest.chmod(0o755)
written.append(dest)
return written
def build_compose_command(target_dir: str | Path, profile: str = "cpu") -> list[str]:
return ["docker", "compose", "--profile", profile, "up", "-d"]
def run_compose(target_dir: str | Path, profile: str = "cpu", dry_run: bool = False) -> dict:
target = Path(target_dir).expanduser()
command = build_compose_command(target, profile=profile)
if dry_run:
return {"target_dir": str(target), "command": command, "executed": False}
subprocess.run(command, cwd=target, check=True)
return {"target_dir": str(target), "command": command, "executed": True}
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(description="Fetch and launch the upstream RAGFlow Docker bundle")
parser.add_argument("--target-dir", default=str(Path.home() / ".hermes" / "services" / "ragflow"))
parser.add_argument("--profile", choices=["cpu", "gpu"], default="cpu")
parser.add_argument("--overwrite", action="store_true")
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--no-up", action="store_true", help="Only fetch bundle files; do not run docker compose")
args = parser.parse_args(argv)
written = materialize_bundle(args.target_dir, overwrite=args.overwrite)
print(f"Fetched {len(written)} RAGFlow docker files into {Path(args.target_dir).expanduser()}")
if args.no_up:
return 0
result = run_compose(args.target_dir, profile=args.profile, dry_run=args.dry_run)
print("Command:", " ".join(result["command"]))
if result["executed"]:
print("RAGFlow docker stack launch requested.")
else:
print("Dry run only; docker compose not executed.")
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,43 @@
from __future__ import annotations
import importlib.util
import io
from pathlib import Path
from unittest.mock import patch
ROOT = Path(__file__).resolve().parent.parent
SCRIPT_PATH = ROOT / "scripts" / "ragflow_bootstrap.py"
def _load_module():
spec = importlib.util.spec_from_file_location("ragflow_bootstrap", SCRIPT_PATH)
module = importlib.util.module_from_spec(spec)
assert spec.loader is not None
spec.loader.exec_module(module)
return module
def test_materialize_bundle_downloads_required_upstream_artifacts(tmp_path):
module = _load_module()
def fake_urlopen(url, timeout=0):
name = url.rsplit("/", 1)[-1]
return io.BytesIO(f"# fetched {name}\n".encode())
with patch.object(module.urllib.request, "urlopen", side_effect=fake_urlopen):
written = module.materialize_bundle(tmp_path)
assert (tmp_path / "docker-compose.yml").exists()
assert (tmp_path / "docker-compose-base.yml").exists()
assert (tmp_path / ".env").exists()
assert any(path.name == "entrypoint.sh" for path in written)
def test_build_compose_command_respects_profile_and_directory(tmp_path):
module = _load_module()
command = module.build_compose_command(tmp_path, profile="gpu")
assert command[:4] == ["docker", "compose", "--profile", "gpu"]
assert command[-2:] == ["up", "-d"]

View File

@@ -1,39 +0,0 @@
"""Tests for binary_extensions helpers."""
from tools.binary_extensions import has_binary_extension, has_image_extension
def test_has_image_extension_png():
assert has_image_extension("/tmp/test.png") is True
assert has_image_extension("/tmp/test.PNG") is True
def test_has_image_extension_jpg_variants():
assert has_image_extension("/tmp/test.jpg") is True
assert has_image_extension("/tmp/test.jpeg") is True
assert has_image_extension("/tmp/test.JPG") is True
def test_has_image_extension_webp():
assert has_image_extension("/tmp/test.webp") is True
def test_has_image_extension_gif():
assert has_image_extension("/tmp/test.gif") is True
def test_has_image_extension_no_ext():
assert has_image_extension("/tmp/test") is False
def test_has_image_extension_non_image():
assert has_image_extension("/tmp/test.txt") is False
assert has_image_extension("/tmp/test.exe") is False
assert has_image_extension("/tmp/test.pdf") is False
def test_has_binary_extension_includes_images():
"""All image extensions must also be in binary extensions."""
assert has_binary_extension("/tmp/test.png") is True
assert has_binary_extension("/tmp/test.jpg") is True
assert has_binary_extension("/tmp/test.webp") is True

View File

@@ -294,67 +294,3 @@ class TestSearchHints:
class TestReadFileImageRouting:
"""Tests that image files are routed through vision analysis."""
@patch("tools.file_tools._analyze_image_with_vision")
def test_image_png_routes_to_vision(self, mock_analyze, tmp_path):
mock_analyze.return_value = json.dumps({"analysis": "test image"})
img = tmp_path / "test.png"
img.write_bytes(b"fake png data")
from tools.file_tools import read_file_tool
result = read_file_tool(str(img))
mock_analyze.assert_called_once()
assert json.loads(result)["analysis"] == "test image"
@patch("tools.file_tools._analyze_image_with_vision")
def test_image_jpeg_routes_to_vision(self, mock_analyze, tmp_path):
mock_analyze.return_value = json.dumps({"analysis": "test image"})
img = tmp_path / "test.jpeg"
img.write_bytes(b"fake jpeg data")
from tools.file_tools import read_file_tool
result = read_file_tool(str(img))
mock_analyze.assert_called_once()
assert json.loads(result)["analysis"] == "test image"
@patch("tools.file_tools._analyze_image_with_vision")
def test_image_webp_routes_to_vision(self, mock_analyze, tmp_path):
mock_analyze.return_value = json.dumps({"analysis": "test image"})
img = tmp_path / "test.webp"
img.write_bytes(b"fake webp data")
from tools.file_tools import read_file_tool
result = read_file_tool(str(img))
mock_analyze.assert_called_once()
assert json.loads(result)["analysis"] == "test image"
def test_non_image_binary_blocked(self, tmp_path):
from tools.file_tools import read_file_tool
exe = tmp_path / "test.exe"
exe.write_bytes(b"fake exe data")
result = json.loads(read_file_tool(str(exe)))
assert "error" in result
assert "Cannot read binary" in result["error"]
class TestAnalyzeImageWithVision:
"""Tests for the _analyze_image_with_vision helper."""
def test_import_error_fallback(self):
with patch.dict("sys.modules", {"tools.vision_tools": None}):
from tools.file_tools import _analyze_image_with_vision
result = json.loads(_analyze_image_with_vision("/tmp/test.png"))
assert "error" in result
assert "vision_analyze tool is not available" in result["error"]

View File

@@ -0,0 +1,122 @@
from __future__ import annotations
import importlib
import json
import sys
from pathlib import Path
from unittest.mock import patch
from tools.registry import registry
class _Response:
def __init__(self, payload: dict, status_code: int = 200):
self._payload = payload
self.status_code = status_code
self.text = json.dumps(payload)
def json(self):
return self._payload
def raise_for_status(self):
if self.status_code >= 400:
raise RuntimeError(f"HTTP {self.status_code}")
def _reload_module():
registry.deregister("ragflow_ingest")
registry.deregister("ragflow_query")
sys.modules.pop("tools.ragflow_tool", None)
module = importlib.import_module("tools.ragflow_tool")
return importlib.reload(module)
def test_ragflow_tools_register_and_support_document_formats():
module = _reload_module()
assert registry.get_entry("ragflow_ingest") is not None
assert registry.get_entry("ragflow_query") is not None
assert ".pdf" in module.SUPPORTED_EXTENSIONS
assert ".docx" in module.SUPPORTED_EXTENSIONS
assert ".png" in module.SUPPORTED_EXTENSIONS
assert ".md" in module.SUPPORTED_EXTENSIONS
def test_ragflow_ingest_creates_dataset_uploads_and_starts_parse(tmp_path):
module = _reload_module()
document = tmp_path / "paper.pdf"
document.write_bytes(b"%PDF-1.7\n")
calls: list[tuple[str, str, dict | None, dict | None]] = []
def fake_request(method, url, *, headers=None, params=None, json=None, files=None, timeout=None):
calls.append((method, url, params, json))
if method == "GET" and url.endswith("/api/v1/datasets"):
return _Response({"code": 0, "data": []})
if method == "POST" and url.endswith("/api/v1/datasets"):
assert json["name"] == "research-papers"
assert json["chunk_method"] == "paper"
return _Response({"code": 0, "data": {"id": "ds-1", "name": "research-papers"}})
if method == "POST" and url.endswith("/api/v1/datasets/ds-1/documents"):
assert files and files[0][0] == "file"
return _Response({"code": 0, "data": [{"id": "doc-1", "name": "paper.pdf"}]})
if method == "POST" and url.endswith("/api/v1/datasets/ds-1/chunks"):
assert json == {"document_ids": ["doc-1"]}
return _Response({"code": 0})
raise AssertionError(f"Unexpected request: {method} {url}")
with patch("tools.ragflow_tool.requests.request", side_effect=fake_request):
result = json.loads(module.ragflow_ingest_tool(str(document), dataset="research-papers"))
assert result["dataset_id"] == "ds-1"
assert result["document_ids"] == ["doc-1"]
assert result["parse_started"] is True
assert result["chunk_method"] == "paper"
assert calls[0][0] == "GET"
def test_ragflow_query_retrieves_chunks_for_named_dataset():
module = _reload_module()
def fake_request(method, url, *, headers=None, params=None, json=None, files=None, timeout=None):
if method == "GET" and url.endswith("/api/v1/datasets"):
assert params == {"name": "tech-docs"}
return _Response({"code": 0, "data": [{"id": "ds-9", "name": "tech-docs"}]})
if method == "POST" and url.endswith("/api/v1/retrieval"):
assert json["question"] == "How does parsing work?"
assert json["dataset_ids"] == ["ds-9"]
assert json["page_size"] == 2
return _Response(
{
"code": 0,
"data": {
"chunks": [
{
"content": "Parsing starts by uploading documents.",
"document_id": "doc-9",
"document_keyword": "guide.md",
"similarity": 0.98,
}
],
"total": 1,
},
}
)
raise AssertionError(f"Unexpected request: {method} {url}")
with patch("tools.ragflow_tool.requests.request", side_effect=fake_request):
result = json.loads(module.ragflow_query_tool("How does parsing work?", "tech-docs", limit=2))
assert result["dataset_id"] == "ds-9"
assert result["total"] == 1
assert result["chunks"][0]["content"] == "Parsing starts by uploading documents."
def test_ragflow_ingest_rejects_unsupported_document_types(tmp_path):
module = _reload_module()
document = tmp_path / "binary.exe"
document.write_bytes(b"MZ")
result = json.loads(module.ragflow_ingest_tool(str(document), dataset="ignored"))
assert "error" in result
assert "Unsupported document type" in result["error"]

View File

@@ -34,22 +34,9 @@ BINARY_EXTENSIONS = frozenset({
})
IMAGE_EXTENSIONS = frozenset({
".png", ".jpg", ".jpeg", ".gif", ".bmp", ".ico", ".webp", ".tiff", ".tif",
})
def has_binary_extension(path: str) -> bool:
"""Check if a file path has a binary extension. Pure string check, no I/O."""
dot = path.rfind(".")
if dot == -1:
return False
return path[dot:].lower() in BINARY_EXTENSIONS
def has_image_extension(path: str) -> bool:
"""Check if a file path has an image extension. Pure string check, no I/O."""
dot = path.rfind(".")
if dot == -1:
return False
return path[dot:].lower() in IMAGE_EXTENSIONS

View File

@@ -1893,13 +1893,11 @@ def browser_get_images(task_id: Optional[str] = None) -> str:
def browser_vision(question: str, annotate: bool = False, task_id: Optional[str] = None) -> str:
"""
Take a screenshot of the current page and analyze it with vision AI.
This tool captures what's visually displayed in the browser and sends it
to the configured vision model for analysis. When the active model is
natively multimodal (e.g. Gemma 4) it is used directly; otherwise the
auxiliary vision backend is used. Useful for understanding visual content
that the text-based snapshot may not capture (CAPTCHAs, verification
challenges, images, complex layouts, etc.).
to Gemini for analysis. Useful for understanding visual content that the
text-based snapshot may not capture (CAPTCHAs, verification challenges,
images, complex layouts, etc.).
The screenshot is saved persistently and its file path is returned alongside
the analysis, so it can be shared with users via MEDIA:<path> in the response.

View File

@@ -7,7 +7,7 @@ import logging
import os
import threading
from pathlib import Path
from tools.binary_extensions import has_binary_extension, has_image_extension
from tools.binary_extensions import has_binary_extension
from tools.file_operations import ShellFileOperations
from agent.redact import redact_sensitive_text
@@ -279,52 +279,6 @@ def clear_file_ops_cache(task_id: str = None):
_file_ops_cache.clear()
def _analyze_image_with_vision(image_path: str, task_id: str = "default") -> str:
"""Route an image file through the vision analysis pipeline.
Uses vision_analyze_tool with a default descriptive prompt. Falls back
to a manual error when no vision backend is available.
"""
import asyncio
try:
from tools.vision_tools import vision_analyze_tool
except ImportError:
return json.dumps({
"error": (
f"Image file '{image_path}' detected but vision_analyze tool "
"is not available. Use vision_analyze directly if configured."
),
})
prompt = (
"Describe this image in detail. If it contains text, transcribe "
"the text. If it is a diagram, chart, or UI screenshot, describe "
"the layout, colors, labels, and any visible data."
)
try:
result = asyncio.run(vision_analyze_tool(image_url=image_path, question=prompt))
except Exception as exc:
return json.dumps({
"error": (
f"Image file '{image_path}' detected but vision analysis failed: {exc}. "
"Use vision_analyze directly if configured."
),
})
try:
parsed = json.loads(result)
except json.JSONDecodeError:
parsed = {"content": result}
# Wrap the vision result so the caller knows it came from image analysis
return json.dumps({
"image_path": image_path,
"analysis": parsed.get("content") or parsed.get("analysis") or result,
"source": "vision_analyze",
}, ensure_ascii=False)
def read_file_tool(path: str, offset: int = 1, limit: int = 500, task_id: str = "default") -> str:
"""Read a file with pagination and line numbers."""
try:
@@ -341,13 +295,10 @@ def read_file_tool(path: str, offset: int = 1, limit: int = 500, task_id: str =
_resolved = Path(path).expanduser().resolve()
# ── Binary / image file guard ─────────────────────────────────
# Block binary files by extension (no I/O). Images are routed
# through the vision analysis pipeline when a backend is available.
# ── Binary file guard ─────────────────────────────────────────
# Block binary files by extension (no I/O).
if has_binary_extension(str(_resolved)):
_ext = _resolved.suffix.lower()
if has_image_extension(str(_resolved)):
return _analyze_image_with_vision(str(_resolved), task_id=task_id)
return json.dumps({
"error": (
f"Cannot read binary file '{path}' ({_ext}). "
@@ -778,7 +729,7 @@ def _check_file_reqs():
READ_FILE_SCHEMA = {
"name": "read_file",
"description": "Read a text file with line numbers and pagination. Use this instead of cat/head/tail in terminal. Output format: 'LINE_NUM|CONTENT'. Suggests similar filenames if not found. Use offset and limit for large files. Reads exceeding ~100K characters are rejected; use offset and limit to read specific sections of large files. NOTE: Image files (PNG, JPEG, WebP, GIF, etc.) are automatically analyzed via vision_analyze. Other binary files cannot be read as text.",
"description": "Read a text file with line numbers and pagination. Use this instead of cat/head/tail in terminal. Output format: 'LINE_NUM|CONTENT'. Suggests similar filenames if not found. Use offset and limit for large files. Reads exceeding ~100K characters are rejected; use offset and limit to read specific sections of large files. NOTE: Cannot read images or binary files — use vision_analyze for images.",
"parameters": {
"type": "object",
"properties": {

344
tools/ragflow_tool.py Normal file
View File

@@ -0,0 +1,344 @@
#!/usr/bin/env python3
"""RAGFlow tool integration for document understanding.
Provides two tools:
- ragflow_ingest(document_url, dataset): upload and parse a document into RAGFlow
- ragflow_query(query, dataset): retrieve relevant chunks from a dataset
Default deployment target is a local RAGFlow server on http://localhost:9380.
"""
from __future__ import annotations
import json
import mimetypes
import os
import tempfile
from pathlib import Path
from typing import Any
from urllib.parse import urlparse
import requests
from tools.registry import registry, tool_error, tool_result
RAGFLOW_INGEST_SCHEMA = {
"name": "ragflow_ingest",
"description": (
"Upload a document into a RAGFlow dataset, creating the dataset if needed, "
"then trigger parsing so Hermes can query the content later. Supports PDF, "
"Word, images via OCR, plus text and code documents."
),
"parameters": {
"type": "object",
"properties": {
"document_url": {
"type": "string",
"description": "HTTP(S) URL, file:// URL, or local filesystem path to the document.",
},
"dataset": {
"type": "string",
"description": "Dataset name or id to ingest into. Created automatically when absent.",
},
},
"required": ["document_url", "dataset"],
},
}
RAGFLOW_QUERY_SCHEMA = {
"name": "ragflow_query",
"description": (
"Query a RAGFlow dataset for relevant chunks. Useful for research papers, "
"technical docs, OCR-processed images, and ingested codebase documents."
),
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Question or search query to run against RAGFlow.",
},
"dataset": {
"type": "string",
"description": "Dataset name or id to search.",
},
"limit": {
"type": "integer",
"description": "Maximum number of chunks to return.",
"default": 5,
"minimum": 1,
"maximum": 25,
},
},
"required": ["query", "dataset"],
},
}
SUPPORTED_EXTENSIONS = {
".pdf": "paper",
".doc": "paper",
".docx": "paper",
".ppt": "presentation",
".pptx": "presentation",
".png": "picture",
".jpg": "picture",
".jpeg": "picture",
".webp": "picture",
".bmp": "picture",
".tif": "picture",
".tiff": "picture",
".gif": "picture",
".txt": "naive",
".md": "naive",
".rst": "naive",
".html": "naive",
".htm": "naive",
".csv": "table",
".tsv": "table",
".json": "naive",
".yaml": "naive",
".yml": "naive",
".toml": "naive",
".ini": "naive",
".py": "naive",
".js": "naive",
".ts": "naive",
".tsx": "naive",
".jsx": "naive",
".java": "naive",
".go": "naive",
".rs": "naive",
".c": "naive",
".cc": "naive",
".cpp": "naive",
".h": "naive",
".hpp": "naive",
".rb": "naive",
".php": "naive",
".sql": "naive",
".sh": "naive",
}
def _ragflow_base_url() -> str:
return os.getenv("RAGFLOW_API_URL", "http://localhost:9380").rstrip("/")
def _ragflow_headers(json_body: bool = True) -> dict[str, str]:
headers: dict[str, str] = {}
api_key = os.getenv("RAGFLOW_API_KEY", "").strip()
if api_key:
headers["Authorization"] = f"Bearer {api_key}"
if json_body:
headers["Content-Type"] = "application/json"
return headers
def _ragflow_check_requirements() -> bool:
return True
def _request_json(method: str, path: str, *, params=None, json_payload=None, files=None) -> dict[str, Any]:
response = requests.request(
method,
f"{_ragflow_base_url()}{path}",
headers=_ragflow_headers(json_body=files is None),
params=params,
json=json_payload,
files=files,
timeout=120,
)
response.raise_for_status()
payload = response.json()
if payload.get("code", 0) != 0:
message = payload.get("message") or payload.get("error") or "RAGFlow request failed"
raise RuntimeError(message)
return payload
def _is_probable_dataset_id(dataset: str) -> bool:
compact = dataset.replace("-", "")
return len(compact) >= 16 and all(ch.isalnum() for ch in compact)
def _resolve_dataset(dataset: str) -> tuple[str, str] | None:
dataset = dataset.strip()
if not dataset:
return None
params = {"id": dataset} if _is_probable_dataset_id(dataset) else {"name": dataset}
payload = _request_json("GET", "/api/v1/datasets", params=params)
data = payload.get("data") or []
if not data:
return None
match = data[0]
return match["id"], match.get("name", dataset)
def _ensure_dataset(dataset: str, chunk_method: str) -> tuple[str, str]:
resolved = _resolve_dataset(dataset)
if resolved:
return resolved
payload = _request_json(
"POST",
"/api/v1/datasets",
json_payload={"name": dataset, "chunk_method": chunk_method},
)
data = payload.get("data") or {}
return data["id"], data.get("name", dataset)
def _prepare_document(document_url: str) -> tuple[Path, bool]:
parsed = urlparse(document_url)
if parsed.scheme in {"http", "https"}:
response = requests.get(document_url, timeout=120)
response.raise_for_status()
suffix = Path(parsed.path).suffix or ".bin"
tmp = tempfile.NamedTemporaryFile(delete=False, suffix=suffix)
tmp.write(response.content)
tmp.flush()
tmp.close()
return Path(tmp.name), True
if parsed.scheme == "file":
return Path(parsed.path), False
return Path(document_url).expanduser(), False
def _detect_chunk_method(path: Path) -> str:
extension = path.suffix.lower()
if extension not in SUPPORTED_EXTENSIONS:
supported = ", ".join(sorted(SUPPORTED_EXTENSIONS))
raise ValueError(f"Unsupported document type '{extension or path.name}'. Supported document types: {supported}")
return SUPPORTED_EXTENSIONS[extension]
def _upload_document(dataset_id: str, path: Path) -> list[str]:
mime = mimetypes.guess_type(path.name)[0] or "application/octet-stream"
with path.open("rb") as handle:
payload = _request_json(
"POST",
f"/api/v1/datasets/{dataset_id}/documents",
files=[("file", (path.name, handle, mime))],
)
documents = payload.get("data") or []
ids = [item["id"] for item in documents if item.get("id")]
if not ids:
raise RuntimeError("RAGFlow upload did not return any document ids")
return ids
def ragflow_ingest_tool(document_url: str, dataset: str) -> str:
local_path = None
should_cleanup = False
try:
local_path, should_cleanup = _prepare_document(document_url)
if not local_path.exists():
return tool_error(f"Document not found: {document_url}")
chunk_method = _detect_chunk_method(local_path)
dataset_id, dataset_name = _ensure_dataset(dataset, chunk_method)
document_ids = _upload_document(dataset_id, local_path)
_request_json(
"POST",
f"/api/v1/datasets/{dataset_id}/chunks",
json_payload={"document_ids": document_ids},
)
return tool_result(
success=True,
dataset_id=dataset_id,
dataset_name=dataset_name,
document_ids=document_ids,
parse_started=True,
chunk_method=chunk_method,
source=document_url,
filename=local_path.name,
)
except ValueError as exc:
return tool_error(str(exc))
except Exception as exc:
return tool_error(f"RAGFlow ingest failed: {exc}")
finally:
if should_cleanup and local_path is not None:
try:
local_path.unlink(missing_ok=True)
except Exception:
pass
def _normalize_chunks(chunks: list[dict[str, Any]]) -> list[dict[str, Any]]:
normalized = []
for chunk in chunks:
normalized.append(
{
"content": chunk.get("content", ""),
"document_id": chunk.get("document_id", ""),
"document_name": chunk.get("document_keyword", ""),
"similarity": chunk.get("similarity"),
"highlight": chunk.get("highlight", ""),
}
)
return normalized
def ragflow_query_tool(query: str, dataset: str, limit: int = 5) -> str:
try:
resolved = _resolve_dataset(dataset)
if not resolved:
return tool_error(f"RAGFlow dataset not found: {dataset}")
dataset_id, dataset_name = resolved
payload = _request_json(
"POST",
"/api/v1/retrieval",
json_payload={
"question": query,
"dataset_ids": [dataset_id],
"page_size": max(1, min(int(limit), 25)),
"highlight": True,
"keyword": True,
},
)
data = payload.get("data") or {}
chunks = data.get("chunks") or []
return tool_result(
success=True,
dataset_id=dataset_id,
dataset_name=dataset_name,
total=data.get("total", len(chunks)),
chunks=_normalize_chunks(chunks),
)
except Exception as exc:
return tool_error(f"RAGFlow query failed: {exc}")
def _handle_ragflow_ingest(args, **_kwargs):
return ragflow_ingest_tool(
document_url=args.get("document_url", ""),
dataset=args.get("dataset", ""),
)
def _handle_ragflow_query(args, **_kwargs):
return ragflow_query_tool(
query=args.get("query", ""),
dataset=args.get("dataset", ""),
limit=args.get("limit", 5),
)
registry.register(
name="ragflow_ingest",
toolset="web",
schema=RAGFLOW_INGEST_SCHEMA,
handler=_handle_ragflow_ingest,
check_fn=_ragflow_check_requirements,
requires_env=["RAGFLOW_API_URL", "RAGFLOW_API_KEY"],
emoji="📚",
)
registry.register(
name="ragflow_query",
toolset="web",
schema=RAGFLOW_QUERY_SCHEMA,
handler=_handle_ragflow_query,
check_fn=_ragflow_check_requirements,
requires_env=["RAGFLOW_API_URL", "RAGFLOW_API_KEY"],
emoji="🧠",
)