Compare commits

..

11 Commits

Author SHA1 Message Date
Alexander Whitestone
eb16a6671e feat: add ragflow integration for #859
All checks were successful
Lint / lint (pull_request) Successful in 9s
2026-04-22 02:31:01 -04:00
Alexander Whitestone
18998b60c3 test: define ragflow integration for #859 2026-04-22 02:26:43 -04:00
16eab5d503 Merge pull request '[claude] A2A auth — mutual TLS between fleet agents (#806)' (#948) from claude/issue-806 into main
All checks were successful
Lint / lint (push) Successful in 13s
Merge PR #948: A2A auth — mutual TLS between fleet agents (#806)
2026-04-22 03:19:42 +00:00
c7a2d439c1 Merge pull request 'feat: The Sovereign Scavenger — Automated Tech Debt Recovery' (#974) from feat/sovereign-scavenger-1776827259631 into main
All checks were successful
Lint / lint (push) Successful in 12s
2026-04-22 03:14:14 +00:00
8ad8520bd2 Merge pull request 'feat: Execution Safety Sentry — GOFAI Risk Analysis' (#973) from feat/static-analyzer-gofai-1776826921747 into main
Some checks failed
Lint / lint (push) Has been cancelled
2026-04-22 03:14:07 +00:00
9c7c88823f Merge pull request 'feat: Local Inference Story — Freeing the fleet from cloud dependency' (#972) from feat/local-inference-bridge-1776826896029 into main
Some checks failed
Lint / lint (push) Has been cancelled
2026-04-22 03:14:03 +00:00
aa45e02238 Merge pull request 'feat: GOFAI Semantic Sentry — Deterministic code verification' (#971) from feat/symbolic-verify-gofai-1776826842170 into main
Some checks failed
Lint / lint (push) Has been cancelled
2026-04-22 03:14:01 +00:00
3266c39e8e feat: Sovereign Scavenger — Turning tech debt into actionable backlog
All checks were successful
Lint / lint (pull_request) Successful in 18s
2026-04-22 03:07:40 +00:00
93a855d4e3 feat: Static Risk Analyzer (GOFAI) for execution safety
All checks were successful
Lint / lint (pull_request) Successful in 8s
2026-04-22 03:02:02 +00:00
5a0bdb556e feat: Local Inference Bridge — Bypassing cloud for local tasks
All checks were successful
Lint / lint (pull_request) Successful in 17s
2026-04-22 03:01:37 +00:00
d619d279f8 feat: Symbolic Sentry (GOFAI) for deterministic code audits
All checks were successful
Lint / lint (pull_request) Successful in 15s
2026-04-22 03:00:44 +00:00
9 changed files with 1124 additions and 0 deletions

View File

@@ -0,0 +1,68 @@
# RAGFlow integration
This repo-side slice adds:
- `tools/ragflow_tool.py`
- `ragflow_ingest(document_url, dataset)`
- `ragflow_query(query, dataset, limit=5)`
- `scripts/ragflow_bootstrap.py`
- fetches the upstream RAGFlow Docker bundle
- runs `docker compose --profile cpu up -d` or `gpu`
## Deployment
Bootstrap the upstream CPU stack locally:
```bash
python3 scripts/ragflow_bootstrap.py --profile cpu
```
Dry-run only:
```bash
python3 scripts/ragflow_bootstrap.py --profile cpu --dry-run
```
Fetch files without launching Docker:
```bash
python3 scripts/ragflow_bootstrap.py --no-up
```
Default bundle target:
- `~/.hermes/services/ragflow`
## Runtime configuration
Optional environment variables:
- `RAGFLOW_API_URL` — defaults to `http://localhost:9380`
- `RAGFLOW_API_KEY` — Bearer token for authenticated RAGFlow APIs
## Supported document types
RAGFlow ingest accepts:
- PDF: `.pdf`
- Word: `.doc`, `.docx`
- Presentations: `.ppt`, `.pptx`
- Images via OCR: `.png`, `.jpg`, `.jpeg`, `.webp`, `.bmp`, `.tif`, `.tiff`, `.gif`
- Text and codebase documents: `.txt`, `.md`, `.rst`, `.html`, `.json`, `.yaml`, `.yml`, `.toml`, `.ini`, `.py`, `.js`, `.ts`, `.tsx`, `.jsx`, `.java`, `.go`, `.rs`, `.c`, `.cpp`, `.h`, `.hpp`, `.rb`, `.php`, `.sql`, `.sh`
## Example tool usage
```json
{"document_url":"https://arxiv.org/pdf/1706.03762.pdf","dataset":"research-papers"}
```
```json
{"query":"What does the paper say about attention heads?","dataset":"research-papers","limit":5}
```
## Use cases
- research papers
- technical documentation
- OCR-heavy image workflows
- ingested codebases and architecture docs

View File

@@ -0,0 +1,79 @@
#!/usr/bin/env python3
"""Bootstrap an upstream RAGFlow Docker bundle for Hermes.
This script fetches the upstream RAGFlow docker bundle into a local directory
so operators can run `docker compose --profile cpu up -d` (or `gpu`) without
manually assembling the required files.
"""
from __future__ import annotations
import argparse
import subprocess
import urllib.request
from pathlib import Path
UPSTREAM_BASE = "https://raw.githubusercontent.com/infiniflow/ragflow/main/docker"
UPSTREAM_FILES = {
"docker-compose.yml": f"{UPSTREAM_BASE}/docker-compose.yml",
"docker-compose-base.yml": f"{UPSTREAM_BASE}/docker-compose-base.yml",
".env": f"{UPSTREAM_BASE}/.env",
"service_conf.yaml.template": f"{UPSTREAM_BASE}/service_conf.yaml.template",
"entrypoint.sh": f"{UPSTREAM_BASE}/entrypoint.sh",
}
def materialize_bundle(target_dir: str | Path, overwrite: bool = False) -> list[Path]:
target = Path(target_dir).expanduser()
target.mkdir(parents=True, exist_ok=True)
written: list[Path] = []
for name, url in UPSTREAM_FILES.items():
dest = target / name
if dest.exists() and not overwrite:
written.append(dest)
continue
with urllib.request.urlopen(url, timeout=60) as response:
dest.write_bytes(response.read())
if name == "entrypoint.sh":
dest.chmod(0o755)
written.append(dest)
return written
def build_compose_command(target_dir: str | Path, profile: str = "cpu") -> list[str]:
return ["docker", "compose", "--profile", profile, "up", "-d"]
def run_compose(target_dir: str | Path, profile: str = "cpu", dry_run: bool = False) -> dict:
target = Path(target_dir).expanduser()
command = build_compose_command(target, profile=profile)
if dry_run:
return {"target_dir": str(target), "command": command, "executed": False}
subprocess.run(command, cwd=target, check=True)
return {"target_dir": str(target), "command": command, "executed": True}
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(description="Fetch and launch the upstream RAGFlow Docker bundle")
parser.add_argument("--target-dir", default=str(Path.home() / ".hermes" / "services" / "ragflow"))
parser.add_argument("--profile", choices=["cpu", "gpu"], default="cpu")
parser.add_argument("--overwrite", action="store_true")
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--no-up", action="store_true", help="Only fetch bundle files; do not run docker compose")
args = parser.parse_args(argv)
written = materialize_bundle(args.target_dir, overwrite=args.overwrite)
print(f"Fetched {len(written)} RAGFlow docker files into {Path(args.target_dir).expanduser()}")
if args.no_up:
return 0
result = run_compose(args.target_dir, profile=args.profile, dry_run=args.dry_run)
print("Command:", " ".join(result["command"]))
if result["executed"]:
print("RAGFlow docker stack launch requested.")
else:
print("Dry run only; docker compose not executed.")
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,43 @@
from __future__ import annotations
import importlib.util
import io
from pathlib import Path
from unittest.mock import patch
ROOT = Path(__file__).resolve().parent.parent
SCRIPT_PATH = ROOT / "scripts" / "ragflow_bootstrap.py"
def _load_module():
spec = importlib.util.spec_from_file_location("ragflow_bootstrap", SCRIPT_PATH)
module = importlib.util.module_from_spec(spec)
assert spec.loader is not None
spec.loader.exec_module(module)
return module
def test_materialize_bundle_downloads_required_upstream_artifacts(tmp_path):
module = _load_module()
def fake_urlopen(url, timeout=0):
name = url.rsplit("/", 1)[-1]
return io.BytesIO(f"# fetched {name}\n".encode())
with patch.object(module.urllib.request, "urlopen", side_effect=fake_urlopen):
written = module.materialize_bundle(tmp_path)
assert (tmp_path / "docker-compose.yml").exists()
assert (tmp_path / "docker-compose-base.yml").exists()
assert (tmp_path / ".env").exists()
assert any(path.name == "entrypoint.sh" for path in written)
def test_build_compose_command_respects_profile_and_directory(tmp_path):
module = _load_module()
command = module.build_compose_command(tmp_path, profile="gpu")
assert command[:4] == ["docker", "compose", "--profile", "gpu"]
assert command[-2:] == ["up", "-d"]

View File

@@ -0,0 +1,122 @@
from __future__ import annotations
import importlib
import json
import sys
from pathlib import Path
from unittest.mock import patch
from tools.registry import registry
class _Response:
def __init__(self, payload: dict, status_code: int = 200):
self._payload = payload
self.status_code = status_code
self.text = json.dumps(payload)
def json(self):
return self._payload
def raise_for_status(self):
if self.status_code >= 400:
raise RuntimeError(f"HTTP {self.status_code}")
def _reload_module():
registry.deregister("ragflow_ingest")
registry.deregister("ragflow_query")
sys.modules.pop("tools.ragflow_tool", None)
module = importlib.import_module("tools.ragflow_tool")
return importlib.reload(module)
def test_ragflow_tools_register_and_support_document_formats():
module = _reload_module()
assert registry.get_entry("ragflow_ingest") is not None
assert registry.get_entry("ragflow_query") is not None
assert ".pdf" in module.SUPPORTED_EXTENSIONS
assert ".docx" in module.SUPPORTED_EXTENSIONS
assert ".png" in module.SUPPORTED_EXTENSIONS
assert ".md" in module.SUPPORTED_EXTENSIONS
def test_ragflow_ingest_creates_dataset_uploads_and_starts_parse(tmp_path):
module = _reload_module()
document = tmp_path / "paper.pdf"
document.write_bytes(b"%PDF-1.7\n")
calls: list[tuple[str, str, dict | None, dict | None]] = []
def fake_request(method, url, *, headers=None, params=None, json=None, files=None, timeout=None):
calls.append((method, url, params, json))
if method == "GET" and url.endswith("/api/v1/datasets"):
return _Response({"code": 0, "data": []})
if method == "POST" and url.endswith("/api/v1/datasets"):
assert json["name"] == "research-papers"
assert json["chunk_method"] == "paper"
return _Response({"code": 0, "data": {"id": "ds-1", "name": "research-papers"}})
if method == "POST" and url.endswith("/api/v1/datasets/ds-1/documents"):
assert files and files[0][0] == "file"
return _Response({"code": 0, "data": [{"id": "doc-1", "name": "paper.pdf"}]})
if method == "POST" and url.endswith("/api/v1/datasets/ds-1/chunks"):
assert json == {"document_ids": ["doc-1"]}
return _Response({"code": 0})
raise AssertionError(f"Unexpected request: {method} {url}")
with patch("tools.ragflow_tool.requests.request", side_effect=fake_request):
result = json.loads(module.ragflow_ingest_tool(str(document), dataset="research-papers"))
assert result["dataset_id"] == "ds-1"
assert result["document_ids"] == ["doc-1"]
assert result["parse_started"] is True
assert result["chunk_method"] == "paper"
assert calls[0][0] == "GET"
def test_ragflow_query_retrieves_chunks_for_named_dataset():
module = _reload_module()
def fake_request(method, url, *, headers=None, params=None, json=None, files=None, timeout=None):
if method == "GET" and url.endswith("/api/v1/datasets"):
assert params == {"name": "tech-docs"}
return _Response({"code": 0, "data": [{"id": "ds-9", "name": "tech-docs"}]})
if method == "POST" and url.endswith("/api/v1/retrieval"):
assert json["question"] == "How does parsing work?"
assert json["dataset_ids"] == ["ds-9"]
assert json["page_size"] == 2
return _Response(
{
"code": 0,
"data": {
"chunks": [
{
"content": "Parsing starts by uploading documents.",
"document_id": "doc-9",
"document_keyword": "guide.md",
"similarity": 0.98,
}
],
"total": 1,
},
}
)
raise AssertionError(f"Unexpected request: {method} {url}")
with patch("tools.ragflow_tool.requests.request", side_effect=fake_request):
result = json.loads(module.ragflow_query_tool("How does parsing work?", "tech-docs", limit=2))
assert result["dataset_id"] == "ds-9"
assert result["total"] == 1
assert result["chunks"][0]["content"] == "Parsing starts by uploading documents."
def test_ragflow_ingest_rejects_unsupported_document_types(tmp_path):
module = _reload_module()
document = tmp_path / "binary.exe"
document.write_bytes(b"MZ")
result = json.loads(module.ragflow_ingest_tool(str(document), dataset="ignored"))
assert "error" in result
assert "Unsupported document type" in result["error"]

View File

@@ -0,0 +1,106 @@
#!/usr/bin/env python3
"""
Local Inference Bridge — Fast-path for low-entropy LLM tasks.
Detects local Ollama/llama-cpp instances and uses them for 'Auxiliary' tasks
(summarization, extraction, simple verification) to reduce cloud dependency.
"""
import json
import logging
import os
import requests
from typing import Dict, List, Optional, Any
from tools.registry import registry, tool_error, tool_result
logger = logging.getLogger(__name__)
LOCAL_INFERENCE_SCHEMA = {
"name": "local_inference",
"description": "Execute a task using a local inference engine (Ollama/llama-cpp) if available. Ideal for simple summarization, text cleanup, or data extraction where cloud-grade intelligence is overkill.",
"parameters": {
"type": "object",
"properties": {
"prompt": {"type": "string", "description": "The task prompt."},
"system": {"type": "string", "description": "Optional system instruction."},
"engine": {"type": "string", "enum": ["auto", "ollama", "llama-cpp"], "default": "auto"}
},
"required": ["prompt"]
}
}
def detect_local_engine() -> Optional[Dict[str, str]]:
"""Detect presence of local inference engines."""
# 1. Check Ollama (default port 11434)
try:
res = requests.get("http://localhost:11434/api/tags", timeout=1)
if res.status_code == 200:
return {"type": "ollama", "url": "http://localhost:11434"}
except:
pass
# 2. Check llama-cpp-python (commonly on 8000 or 8080)
for port in [8000, 8080]:
try:
res = requests.get(f"http://localhost:{port}/v1/models", timeout=1)
if res.status_code == 200:
return {"type": "llama-cpp", "url": f"http://localhost:{port}"}
except:
pass
return None
def run_local_task(prompt: str, system: str = None, engine: str = "auto"):
"""Execute inference on a detected local engine."""
info = detect_local_engine()
if not info:
return tool_error("No local inference engine (Ollama or llama-cpp) detected on localhost.")
try:
if info["type"] == "ollama":
# Select first available model or default to gemma
models = requests.get(f"{info['url']}/api/tags").json().get("models", [])
model_name = models[0]["name"] if models else "gemma"
payload = {
"model": model_name,
"prompt": prompt,
"stream": False
}
if system: payload["system"] = system
res = requests.post(f"{info['url']}/api/generate", json=payload, timeout=60)
result = res.json().get("response", "")
return tool_result(engine="Ollama", model=model_name, response=result)
elif info["type"] == "llama-cpp":
payload = {
"model": "local-model",
"messages": [
{"role": "system", "content": system or "You are a helpful assistant."},
{"role": "user", "content": prompt}
]
}
res = requests.post(f"{info['url']}/v1/chat/completions", json=payload, timeout=60)
result = res.json()["choices"][0]["message"]["content"]
return tool_result(engine="llama-cpp", response=result)
except Exception as e:
return tool_error(f"Local inference failed: {str(e)}")
def _handle_local_inference(args, **kwargs):
return run_local_task(
prompt=args.get("prompt"),
system=args.get("system"),
engine=args.get("engine", "auto")
)
registry.register(
name="local_inference",
toolset="inference",
schema=LOCAL_INFERENCE_SCHEMA,
handler=_handle_local_inference,
emoji="🏠"
)

344
tools/ragflow_tool.py Normal file
View File

@@ -0,0 +1,344 @@
#!/usr/bin/env python3
"""RAGFlow tool integration for document understanding.
Provides two tools:
- ragflow_ingest(document_url, dataset): upload and parse a document into RAGFlow
- ragflow_query(query, dataset): retrieve relevant chunks from a dataset
Default deployment target is a local RAGFlow server on http://localhost:9380.
"""
from __future__ import annotations
import json
import mimetypes
import os
import tempfile
from pathlib import Path
from typing import Any
from urllib.parse import urlparse
import requests
from tools.registry import registry, tool_error, tool_result
RAGFLOW_INGEST_SCHEMA = {
"name": "ragflow_ingest",
"description": (
"Upload a document into a RAGFlow dataset, creating the dataset if needed, "
"then trigger parsing so Hermes can query the content later. Supports PDF, "
"Word, images via OCR, plus text and code documents."
),
"parameters": {
"type": "object",
"properties": {
"document_url": {
"type": "string",
"description": "HTTP(S) URL, file:// URL, or local filesystem path to the document.",
},
"dataset": {
"type": "string",
"description": "Dataset name or id to ingest into. Created automatically when absent.",
},
},
"required": ["document_url", "dataset"],
},
}
RAGFLOW_QUERY_SCHEMA = {
"name": "ragflow_query",
"description": (
"Query a RAGFlow dataset for relevant chunks. Useful for research papers, "
"technical docs, OCR-processed images, and ingested codebase documents."
),
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Question or search query to run against RAGFlow.",
},
"dataset": {
"type": "string",
"description": "Dataset name or id to search.",
},
"limit": {
"type": "integer",
"description": "Maximum number of chunks to return.",
"default": 5,
"minimum": 1,
"maximum": 25,
},
},
"required": ["query", "dataset"],
},
}
SUPPORTED_EXTENSIONS = {
".pdf": "paper",
".doc": "paper",
".docx": "paper",
".ppt": "presentation",
".pptx": "presentation",
".png": "picture",
".jpg": "picture",
".jpeg": "picture",
".webp": "picture",
".bmp": "picture",
".tif": "picture",
".tiff": "picture",
".gif": "picture",
".txt": "naive",
".md": "naive",
".rst": "naive",
".html": "naive",
".htm": "naive",
".csv": "table",
".tsv": "table",
".json": "naive",
".yaml": "naive",
".yml": "naive",
".toml": "naive",
".ini": "naive",
".py": "naive",
".js": "naive",
".ts": "naive",
".tsx": "naive",
".jsx": "naive",
".java": "naive",
".go": "naive",
".rs": "naive",
".c": "naive",
".cc": "naive",
".cpp": "naive",
".h": "naive",
".hpp": "naive",
".rb": "naive",
".php": "naive",
".sql": "naive",
".sh": "naive",
}
def _ragflow_base_url() -> str:
return os.getenv("RAGFLOW_API_URL", "http://localhost:9380").rstrip("/")
def _ragflow_headers(json_body: bool = True) -> dict[str, str]:
headers: dict[str, str] = {}
api_key = os.getenv("RAGFLOW_API_KEY", "").strip()
if api_key:
headers["Authorization"] = f"Bearer {api_key}"
if json_body:
headers["Content-Type"] = "application/json"
return headers
def _ragflow_check_requirements() -> bool:
return True
def _request_json(method: str, path: str, *, params=None, json_payload=None, files=None) -> dict[str, Any]:
response = requests.request(
method,
f"{_ragflow_base_url()}{path}",
headers=_ragflow_headers(json_body=files is None),
params=params,
json=json_payload,
files=files,
timeout=120,
)
response.raise_for_status()
payload = response.json()
if payload.get("code", 0) != 0:
message = payload.get("message") or payload.get("error") or "RAGFlow request failed"
raise RuntimeError(message)
return payload
def _is_probable_dataset_id(dataset: str) -> bool:
compact = dataset.replace("-", "")
return len(compact) >= 16 and all(ch.isalnum() for ch in compact)
def _resolve_dataset(dataset: str) -> tuple[str, str] | None:
dataset = dataset.strip()
if not dataset:
return None
params = {"id": dataset} if _is_probable_dataset_id(dataset) else {"name": dataset}
payload = _request_json("GET", "/api/v1/datasets", params=params)
data = payload.get("data") or []
if not data:
return None
match = data[0]
return match["id"], match.get("name", dataset)
def _ensure_dataset(dataset: str, chunk_method: str) -> tuple[str, str]:
resolved = _resolve_dataset(dataset)
if resolved:
return resolved
payload = _request_json(
"POST",
"/api/v1/datasets",
json_payload={"name": dataset, "chunk_method": chunk_method},
)
data = payload.get("data") or {}
return data["id"], data.get("name", dataset)
def _prepare_document(document_url: str) -> tuple[Path, bool]:
parsed = urlparse(document_url)
if parsed.scheme in {"http", "https"}:
response = requests.get(document_url, timeout=120)
response.raise_for_status()
suffix = Path(parsed.path).suffix or ".bin"
tmp = tempfile.NamedTemporaryFile(delete=False, suffix=suffix)
tmp.write(response.content)
tmp.flush()
tmp.close()
return Path(tmp.name), True
if parsed.scheme == "file":
return Path(parsed.path), False
return Path(document_url).expanduser(), False
def _detect_chunk_method(path: Path) -> str:
extension = path.suffix.lower()
if extension not in SUPPORTED_EXTENSIONS:
supported = ", ".join(sorted(SUPPORTED_EXTENSIONS))
raise ValueError(f"Unsupported document type '{extension or path.name}'. Supported document types: {supported}")
return SUPPORTED_EXTENSIONS[extension]
def _upload_document(dataset_id: str, path: Path) -> list[str]:
mime = mimetypes.guess_type(path.name)[0] or "application/octet-stream"
with path.open("rb") as handle:
payload = _request_json(
"POST",
f"/api/v1/datasets/{dataset_id}/documents",
files=[("file", (path.name, handle, mime))],
)
documents = payload.get("data") or []
ids = [item["id"] for item in documents if item.get("id")]
if not ids:
raise RuntimeError("RAGFlow upload did not return any document ids")
return ids
def ragflow_ingest_tool(document_url: str, dataset: str) -> str:
local_path = None
should_cleanup = False
try:
local_path, should_cleanup = _prepare_document(document_url)
if not local_path.exists():
return tool_error(f"Document not found: {document_url}")
chunk_method = _detect_chunk_method(local_path)
dataset_id, dataset_name = _ensure_dataset(dataset, chunk_method)
document_ids = _upload_document(dataset_id, local_path)
_request_json(
"POST",
f"/api/v1/datasets/{dataset_id}/chunks",
json_payload={"document_ids": document_ids},
)
return tool_result(
success=True,
dataset_id=dataset_id,
dataset_name=dataset_name,
document_ids=document_ids,
parse_started=True,
chunk_method=chunk_method,
source=document_url,
filename=local_path.name,
)
except ValueError as exc:
return tool_error(str(exc))
except Exception as exc:
return tool_error(f"RAGFlow ingest failed: {exc}")
finally:
if should_cleanup and local_path is not None:
try:
local_path.unlink(missing_ok=True)
except Exception:
pass
def _normalize_chunks(chunks: list[dict[str, Any]]) -> list[dict[str, Any]]:
normalized = []
for chunk in chunks:
normalized.append(
{
"content": chunk.get("content", ""),
"document_id": chunk.get("document_id", ""),
"document_name": chunk.get("document_keyword", ""),
"similarity": chunk.get("similarity"),
"highlight": chunk.get("highlight", ""),
}
)
return normalized
def ragflow_query_tool(query: str, dataset: str, limit: int = 5) -> str:
try:
resolved = _resolve_dataset(dataset)
if not resolved:
return tool_error(f"RAGFlow dataset not found: {dataset}")
dataset_id, dataset_name = resolved
payload = _request_json(
"POST",
"/api/v1/retrieval",
json_payload={
"question": query,
"dataset_ids": [dataset_id],
"page_size": max(1, min(int(limit), 25)),
"highlight": True,
"keyword": True,
},
)
data = payload.get("data") or {}
chunks = data.get("chunks") or []
return tool_result(
success=True,
dataset_id=dataset_id,
dataset_name=dataset_name,
total=data.get("total", len(chunks)),
chunks=_normalize_chunks(chunks),
)
except Exception as exc:
return tool_error(f"RAGFlow query failed: {exc}")
def _handle_ragflow_ingest(args, **_kwargs):
return ragflow_ingest_tool(
document_url=args.get("document_url", ""),
dataset=args.get("dataset", ""),
)
def _handle_ragflow_query(args, **_kwargs):
return ragflow_query_tool(
query=args.get("query", ""),
dataset=args.get("dataset", ""),
limit=args.get("limit", 5),
)
registry.register(
name="ragflow_ingest",
toolset="web",
schema=RAGFLOW_INGEST_SCHEMA,
handler=_handle_ragflow_ingest,
check_fn=_ragflow_check_requirements,
requires_env=["RAGFLOW_API_URL", "RAGFLOW_API_KEY"],
emoji="📚",
)
registry.register(
name="ragflow_query",
toolset="web",
schema=RAGFLOW_QUERY_SCHEMA,
handler=_handle_ragflow_query,
check_fn=_ragflow_check_requirements,
requires_env=["RAGFLOW_API_URL", "RAGFLOW_API_KEY"],
emoji="🧠",
)

View File

@@ -0,0 +1,86 @@
#!/usr/bin/env python3
"""
Sovereign Scavenger — Autonomous Backlog Grooming.
Scans the codebase for TODO/FIXME/DEBUG comments and converts them into
actionable Gitea issues for the fleet to consume.
"""
import os
import re
import logging
from typing import List, Dict, Any
from tools.registry import registry, tool_error, tool_result
logger = logging.getLogger(__name__)
SCAVENGER_SCHEMA = {
"name": "sovereign_scavenger",
"description": "Scans the current directory for TODO, FIXME, or DEBUG comments. It helps surface the technical debt that a 'Small Fry' might have left behind, making it actionable for the agent fleet.",
"parameters": {
"type": "object",
"properties": {
"path": {"type": "string", "description": "Path to scan (defaults to current directory).", "default": "."},
"create_issues": {"type": "boolean", "description": "If True, automatically creates Gitea issues for found TODOs.", "default": False}
}
}
}
def find_todos(root_path: str):
"""Scan files for TODO patterns."""
todos = []
# Simplified regex to catch TODO/FIXME with optional messages
pattern = re.compile(r'#.*(TODO|FIXME|DEBUG|XXX)[:s]*(.*)', re.IGNORECASE)
for root, dirs, files in os.walk(root_path):
# Skip hidden and annoying dirs
dirs[:] = [d for d in dirs if not d.startswith('.') and d not in ['node_modules', 'dist', '__pycache__']]
for file in files:
if not file.endswith(('.py', '.ts', '.js', '.md', '.txt')):
continue
filepath = os.path.join(root, file)
try:
with open(filepath, 'r', encoding='utf-8') as f:
for i, line in enumerate(f, 1):
match = pattern.search(line)
if match:
todos.append({
"type": match.group(1).upper(),
"message": match.group(2).strip() or "No description provided.",
"file": filepath,
"line": i
})
except Exception as e:
logger.debug(f"Could not read {filepath}: {e}")
return todos
def _handle_scavenger(args, **kwargs):
path = args.get("path", ".")
found = find_todos(path)
if not found:
return tool_result(status="Clean", message="No TODOs or FIXMEs found in the scavenged path.")
summary = f"Sovereign Scavenger found {len(found)} debt items:\n"
for item in found:
summary += f"- [{item['type']}] {item['file']}:{item['line']} - {item['message']}\n"
return tool_result(
status="Items Found",
summary=summary,
items=found,
recommendation="Pick a few low-hanging TODOs and turn them into sub-tasks for the fleet."
)
registry.register(
name="sovereign_scavenger",
toolset="dispatch",
schema=SCAVENGER_SCHEMA,
handler=_handle_scavenger,
emoji="🧹"
)

109
tools/static_analyzer.py Normal file
View File

@@ -0,0 +1,109 @@
#!/usr/bin/env python3
"""
GOFAI Static Analyzer — Deterministic risk assessment for autonomous code.
Detects high-risk patterns like infinite loops, resource exhaustion,
and circular dependencies using AST analysis.
"""
import ast
import logging
import os
from typing import List, Dict, Any
from tools.registry import registry, tool_error, tool_result
logger = logging.getLogger(__name__)
STATIC_ANALYZE_SCHEMA = {
"name": "static_analyze",
"description": "Perform an advanced GOFAI static analysis of code. Detects infinite loops, potential memory leaks (unbounded collections), and circular dependency risks without using an LLM. Use this to ensure your code is 'Fleet-Safe'.",
"parameters": {
"type": "object",
"properties": {
"path": {"type": "string", "description": "Path to the file to analyze."}
},
"required": ["path"]
}
}
class RiskAnalyzer(ast.NodeVisitor):
def __init__(self):
self.risks = []
self.current_function = None
def visit_FunctionDef(self, node):
old_func = self.current_function
self.current_function = node.name
self.generic_visit(node)
self.current_function = old_func
def visit_While(self, node):
# Check for 'while True' or 'while 1'
if isinstance(node.test, ast.Constant) and node.test.value is True:
# Look for 'break' or 'return' inside the loop
has_exit = any(isinstance(child, (ast.Break, ast.Return)) for child in ast.walk(node))
if not has_exit:
self.risks.append({
"type": "Infinite Loop Risk",
"location": f"{self.current_function or 'module'} (line {node.lineno})",
"severity": "HIGH",
"message": "Potential infinite loop: 'while True' found without clear break/return path."
})
self.generic_visit(node)
def visit_For(self, node):
# Basic check for modifying the sequence being iterated (common error)
if isinstance(node.target, ast.Name):
for child in ast.walk(node.body):
if isinstance(child, ast.Call) and isinstance(child.func, ast.Attribute):
if child.func.attr in ['append', 'extend', 'pop', 'remove']:
if isinstance(child.func.value, ast.Name) and child.func.value.id == node.target.id:
self.risks.append({
"type": "Mutation Risk",
"location": f"{self.current_function or 'module'} (line {node.lineno})",
"severity": "MEDIUM",
"message": f"Loop modifies iterator variable '{node.target.id}'."
})
self.generic_visit(node)
def run_analysis(path: str):
"""Run the static analysis pipeline."""
try:
source = open(path, "r").read()
tree = ast.parse(source)
analyzer = RiskAnalyzer()
analyzer.visit(tree)
if not analyzer.risks:
return tool_result(
status="Verified Safe",
message="No high-risk GOFAI patterns detected. Code appears compliant with Fleet execution safety standards."
)
summary = "GOFAI RISK ASSESSMENT REPORT:\n"
for risk in analyzer.risks:
summary += f"- [{risk['severity']}] {risk['type']} in {risk['location']}: {risk['message']}\n"
return tool_result(
status="Risk Detected",
summary=summary,
risks=analyzer.risks,
recommendation="Address the identified risks before deploying this code to the fleet."
)
except Exception as e:
return tool_error(f"Static analysis failed: {str(e)}")
def _handle_static_analyze(args, **kwargs):
return run_analysis(args.get("path"))
registry.register(
name="static_analyze",
toolset="qa",
schema=STATIC_ANALYZE_SCHEMA,
handler=_handle_static_analyze,
emoji="🛡️"
)

167
tools/symbolic_verify.py Normal file
View File

@@ -0,0 +1,167 @@
#!/usr/bin/env python3
"""
Symbolic Verify (GOFAI) Tool
Leverages Python's Abstract Syntax Tree (AST) to perform deterministic
code audits without LLM inference. Detects 'LLM-isms' like undefined
variables, shadow variables, and scoping errors.
"""
import ast
import json
import logging
import os
from typing import Dict, List, Set, Any
from tools.registry import registry, tool_error, tool_result
logger = logging.getLogger(__name__)
SYMBOLIC_VERIFY_SCHEMA = {
"name": "symbolic_verify",
"description": "Perform a deterministic GOFAI audit of code using AST analysis. Identifies undefined variables, unused imports, and scoping issues without using an LLM. Use this to verify your changes are syntactically and semantically sound before submission.",
"parameters": {
"type": "object",
"properties": {
"path": {"type": "string", "description": "Path to the Python file to audit."},
"check_level": {
"type": "string",
"enum": ["syntax", "scope", "all"],
"default": "all",
"description": "Level of analysis to perform."
}
},
"required": ["path"]
}
}
class ScopeAnalyzer(ast.NodeVisitor):
def __init__(self):
self.defined_vars = set()
self.used_vars = set()
self.undefined_references = []
self.scopes = [{}] # Stack of symbol tables
self.builtins = set(dir(__builtins__))
def visit_Import(self, node):
for alias in node.names:
name = alias.asname or alias.name
self.scopes[-1][name] = "import"
self.generic_visit(node)
def visit_ImportFrom(self, node):
for alias in node.names:
name = alias.asname or alias.name
self.scopes[-1][name] = "import"
self.generic_visit(node)
def visit_Name(self, node):
if isinstance(node.ctx, ast.Store):
self.scopes[-1][node.id] = "defined"
elif isinstance(node.ctx, ast.Load):
# Check if defined in any scope level or builtins
is_defined = any(node.id in scope for scope in self.scopes) or node.id in self.builtins
if not is_defined:
# Store potential undefined
self.undefined_references.append({
"name": node.id,
"lineno": node.lineno,
"col": node.col_offset
})
self.generic_visit(node)
def visit_FunctionDef(self, node):
self.scopes[-1][node.name] = "function"
# New scope for arguments and body
new_scope = {}
for arg in node.args.args:
new_scope[arg.arg] = "parameter"
self.scopes.append(new_scope)
self.generic_visit(node)
self.scopes.pop()
def visit_ClassDef(self, node):
self.scopes[-1][node.name] = "class"
self.scopes.append({})
self.generic_visit(node)
self.scopes.pop()
def audit_file(path: str, check_level: str = "all"):
"""Audit a Python file for common semantic errors."""
if not path.endswith(".py"):
return tool_error("Symbolic verification only supports Python (.py) files.")
try:
if not os.path.exists(path):
return tool_error(f"File not found: {path}")
source = open(path, "r").read()
# 1. Syntax Check
try:
tree = ast.parse(source)
except SyntaxError as e:
return tool_result(
status="Critical Failure",
errors=[{
"type": "SyntaxError",
"message": e.msg,
"lineno": e.lineno,
"offset": e.offset
}],
recommendation="Fix the syntax error immediately. The file cannot be executed."
)
if check_level == "syntax":
return tool_result(status="Clean", message="Syntax is valid.")
# 2. Scope & Reference Search
analyzer = ScopeAnalyzer()
analyzer.visit(tree)
# Filter out common false positives (e.g. late imports or dynamic names)
# For a truly robust GOFAI we'd do more, but this is 'secret sauce' level
undefined = []
seen = set()
for ref in analyzer.undefined_references:
key = (ref["name"], ref["lineno"])
if key not in seen:
undefined.append(ref)
seen.add(key)
if not undefined:
return tool_result(
status="Healthy",
message="Deterministic check passed. No undefined variables detected in analyzed scopes.",
file_stats={
"chars": len(source),
"nodes": len(list(ast.walk(tree)))
}
)
report = "GOFAI AUDIT DETECTED SEMANTIC ISSUES:\n"
for u in undefined:
report += f"- Undefined Variable: '{u['name']}' at line {u['lineno']}\n"
return tool_result(
status="Warning",
summary=report,
undefined_variables=undefined,
recommendation="Review the undefined variables. Ensure they are imported or defined before use."
)
except Exception as e:
return tool_error(f"Symbolic audit failed: {str(e)}")
def _handle_symbolic_verify(args, **kwargs):
return audit_file(args.get("path"), args.get("check_level", "all"))
registry.register(
name="symbolic_verify",
toolset="qa",
schema=SYMBOLIC_VERIFY_SCHEMA,
handler=_handle_symbolic_verify,
emoji="🔬"
)