Compare commits

..

10 Commits

Author SHA1 Message Date
Alexander Whitestone
07eb8604f5 feat(tools): add LightRAG integration for graph-based knowledge retrieval (#857)
All checks were successful
Lint / lint (pull_request) Successful in 39s
Adds tools/lightrag_tool.py with two new tools:

- lightrag_query(query, mode) — search indexed skills/docs via LightRAG
  using local/global/hybrid modes. Returns structured JSON with answer.
- lightrag_index(directories) — (re-)build the knowledge graph from
  ~/.hermes/skills/ and optional extra directories.

Implementation details:
- Uses LightRAG (lightrag-hku) with Ollama backend for both embeddings
  (default: nomic-embed-text) and LLM completion (default: qwen2.5:7b)
- Storage at ~/.hermes/lightrag/ (file-based, no Docker)
- Async bridge via asyncio.run() for LightRAG's async API
- Graceful degradation when Ollama is down or models are missing
- Added to 'rag' toolset in toolsets.py
- Added [project.optional-dependencies] 'rag' group in pyproject.toml

Tests:
- 18 tests covering file collection, text reading, requirements check,
  indexing, querying, error handling, and edge cases
- All tests pass
2026-04-22 02:27:24 -04:00
16eab5d503 Merge pull request '[claude] A2A auth — mutual TLS between fleet agents (#806)' (#948) from claude/issue-806 into main
All checks were successful
Lint / lint (push) Successful in 13s
Merge PR #948: A2A auth — mutual TLS between fleet agents (#806)
2026-04-22 03:19:42 +00:00
c7a2d439c1 Merge pull request 'feat: The Sovereign Scavenger — Automated Tech Debt Recovery' (#974) from feat/sovereign-scavenger-1776827259631 into main
All checks were successful
Lint / lint (push) Successful in 12s
2026-04-22 03:14:14 +00:00
8ad8520bd2 Merge pull request 'feat: Execution Safety Sentry — GOFAI Risk Analysis' (#973) from feat/static-analyzer-gofai-1776826921747 into main
Some checks failed
Lint / lint (push) Has been cancelled
2026-04-22 03:14:07 +00:00
9c7c88823f Merge pull request 'feat: Local Inference Story — Freeing the fleet from cloud dependency' (#972) from feat/local-inference-bridge-1776826896029 into main
Some checks failed
Lint / lint (push) Has been cancelled
2026-04-22 03:14:03 +00:00
aa45e02238 Merge pull request 'feat: GOFAI Semantic Sentry — Deterministic code verification' (#971) from feat/symbolic-verify-gofai-1776826842170 into main
Some checks failed
Lint / lint (push) Has been cancelled
2026-04-22 03:14:01 +00:00
3266c39e8e feat: Sovereign Scavenger — Turning tech debt into actionable backlog
All checks were successful
Lint / lint (pull_request) Successful in 18s
2026-04-22 03:07:40 +00:00
93a855d4e3 feat: Static Risk Analyzer (GOFAI) for execution safety
All checks were successful
Lint / lint (pull_request) Successful in 8s
2026-04-22 03:02:02 +00:00
5a0bdb556e feat: Local Inference Bridge — Bypassing cloud for local tasks
All checks were successful
Lint / lint (pull_request) Successful in 17s
2026-04-22 03:01:37 +00:00
d619d279f8 feat: Symbolic Sentry (GOFAI) for deterministic code audits
All checks were successful
Lint / lint (pull_request) Successful in 15s
2026-04-22 03:00:44 +00:00
8 changed files with 1056 additions and 0 deletions

View File

@@ -38,6 +38,7 @@ dependencies = [
[project.optional-dependencies]
modal = ["modal>=1.0.0,<2"]
rag = ["lightrag-hku>=1.4.0,<2", "aiohttp>=3.9.0,<4"]
daytona = ["daytona>=0.148.0,<1"]
dev = ["debugpy>=1.8.0,<2", "pytest>=9.0.2,<10", "pytest-asyncio>=1.3.0,<2", "pytest-xdist>=3.0,<4", "mcp>=1.2.0,<2"]
messaging = ["python-telegram-bot[webhooks]>=22.6,<23", "discord.py[voice]>=2.7.1,<3", "aiohttp>=3.13.3,<4", "slack-bolt>=1.18.0,<2", "slack-sdk>=3.27.0,<4"]

View File

@@ -0,0 +1,176 @@
"""Tests for tools/lightrag_tool.py"""
import json
import sys
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
# LightRAG may not be installed in all test environments
pytest.importorskip("lightrag", reason="lightrag-hku not installed")
from tools.lightrag_tool import (
check_lightrag_requirements,
lightrag_index,
lightrag_query,
_collect_markdown_files,
_read_text_safe,
LIGHTRAG_DIR,
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _parse_result(result: str) -> dict:
"""Parse JSON tool result, falling back to error string detection."""
try:
return json.loads(result)
except json.JSONDecodeError:
return {"_error": result}
# ---------------------------------------------------------------------------
# Unit tests
# ---------------------------------------------------------------------------
class TestCollectMarkdownFiles:
def test_collects_md_files(self, tmp_path):
(tmp_path / "a.md").write_text("# A")
(tmp_path / "b.md").write_text("# B")
(tmp_path / "skip.txt").write_text("text")
found = _collect_markdown_files(tmp_path)
assert len(found) == 2
assert all(p.suffix == ".md" for p in found)
def test_skips_hidden_dirs(self, tmp_path):
(tmp_path / ".git").mkdir()
(tmp_path / ".git" / "readme.md").write_text("# git")
(tmp_path / "visible.md").write_text("# visible")
found = _collect_markdown_files(tmp_path)
names = [p.name for p in found]
assert "visible.md" in names
assert "readme.md" not in names
def test_returns_empty_for_missing_dir(self):
assert _collect_markdown_files(Path("/nonexistent")) == []
class TestReadTextSafe:
def test_reads_small_file(self, tmp_path):
p = tmp_path / "test.md"
p.write_text("hello world")
assert _read_text_safe(p) == "hello world"
def test_truncates_large_file(self, tmp_path):
p = tmp_path / "big.md"
p.write_text("x" * 1_000_000)
text = _read_text_safe(p, limit=500_000)
assert len(text) == 500_000
def test_reads_binary_without_crashing(self, tmp_path):
p = tmp_path / "binary.md"
p.write_bytes(b"\x00\x01\x02")
result = _read_text_safe(p)
# Should not crash; control chars 0x00-0x7F are valid UTF-8
assert isinstance(result, str)
class TestCheckRequirements:
@patch("tools.lightrag_tool._ollama_available", return_value=True)
def test_ok_when_ollama_up(self, mock_ollama):
assert check_lightrag_requirements() is True
@patch("tools.lightrag_tool._ollama_available", return_value=False)
def test_false_when_ollama_down(self, mock_ollama):
assert check_lightrag_requirements() is False
@patch.dict(sys.modules, {"lightrag": None}, clear=False)
def test_false_when_lightrag_missing(self):
with patch("tools.lightrag_tool._ollama_available", return_value=True):
# Force ImportError by removing lightrag from sys.modules
# and blocking import
assert check_lightrag_requirements() is False
class TestLightragIndex:
@patch("tools.lightrag_tool._ollama_available", return_value=False)
def test_error_when_ollama_down(self, mock_ollama):
result = lightrag_index()
assert "Ollama is not running" in result
@patch("tools.lightrag_tool._ollama_available", return_value=True)
@patch("tools.lightrag_tool._has_ollama_model", return_value=False)
def test_error_when_model_missing(self, mock_model, mock_ollama):
result = lightrag_index()
assert "not found in Ollama" in result
@patch("tools.lightrag_tool._ollama_available", return_value=True)
@patch("tools.lightrag_tool._has_ollama_model", return_value=True)
@patch("tools.lightrag_tool._get_lightrag")
@patch("tools.lightrag_tool._collect_markdown_files", return_value=[])
def test_warning_when_no_files(self, mock_collect, mock_get_rag, mock_model, mock_ollama):
result = lightrag_index()
data = _parse_result(result)
assert data.get("status") == "warning"
assert "No markdown files found" in data.get("message", "")
@patch("tools.lightrag_tool._ollama_available", return_value=True)
@patch("tools.lightrag_tool._has_ollama_model", return_value=True)
@patch("tools.lightrag_tool._get_lightrag")
@patch("tools.lightrag_tool._collect_markdown_files")
@patch("tools.lightrag_tool._read_text_safe", return_value="# Skill doc\nContent.")
@patch("asyncio.run")
def test_indexes_files(self, mock_asyncio, mock_read, mock_collect, mock_get_rag, mock_model, mock_ollama):
mock_collect.return_value = [Path("/fake/skills/git.md"), Path("/fake/skills/docker.md")]
mock_rag = MagicMock()
mock_get_rag.return_value = mock_rag
result = lightrag_index()
data = _parse_result(result)
assert data.get("status") == "ok"
assert data.get("indexed_files") == 2
assert data.get("errors") == 0
class TestLightragQuery:
@patch("tools.lightrag_tool._ollama_available", return_value=False)
def test_error_when_ollama_down(self, mock_ollama):
result = lightrag_query("test", mode="hybrid")
assert "Ollama is not running" in result
@patch("tools.lightrag_tool._ollama_available", return_value=True)
@patch("tools.lightrag_tool.LIGHTRAG_DIR")
def test_empty_index_message(self, mock_dir, mock_ollama):
mock_dir.exists.return_value = True
mock_dir.iterdir.return_value = iter([])
result = lightrag_query("test", mode="hybrid")
data = _parse_result(result)
assert data.get("status") == "empty"
@patch("tools.lightrag_tool._ollama_available", return_value=True)
@patch("tools.lightrag_tool.LIGHTRAG_DIR")
@patch("tools.lightrag_tool._get_lightrag")
@patch("asyncio.run", return_value="Use git clone for repos.")
def test_query_returns_answer(self, mock_asyncio, mock_get_rag, mock_dir, mock_ollama):
mock_dir.exists.return_value = True
mock_dir.iterdir.return_value = iter([Path("dummy")])
mock_rag = MagicMock()
mock_get_rag.return_value = mock_rag
result = lightrag_query("How do I clone a repo?", mode="hybrid")
data = _parse_result(result)
assert data.get("status") == "ok"
assert data.get("mode") == "hybrid"
assert "clone" in data.get("answer", "").lower()
@patch("tools.lightrag_tool._ollama_available", return_value=True)
def test_rejects_invalid_mode(self, mock_ollama):
result = lightrag_query("test", mode="invalid")
assert "mode must be one of" in result
def test_rejects_empty_query(self):
result = lightrag_query("", mode="hybrid")
assert "Query cannot be empty" in result

405
tools/lightrag_tool.py Normal file
View File

@@ -0,0 +1,405 @@
#!/usr/bin/env python3
"""
LightRAG Tool — Graph-based knowledge retrieval for skills and docs.
Indexes markdown files under ~/.hermes/skills/ (and optional extra dirs)
into a LightRAG knowledge graph stored at ~/.hermes/lightrag/.
Requires:
- lightrag-hku (pip install lightrag-hku)
- Ollama running locally with an embedding model (default: nomic-embed-text)
- Ollama running locally with a chat model (default: qwen2.5:7b)
Usage:
lightrag_query("How do I dispatch the burn fleet?", mode="hybrid")
lightrag_index() # re-index skill files
"""
import asyncio
import json
import logging
import os
from pathlib import Path
from typing import Dict, List, Optional
import numpy as np
from hermes_constants import get_hermes_home
from tools.registry import registry, tool_error
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Config
# ---------------------------------------------------------------------------
DEFAULT_EMBED_MODEL = os.environ.get("LIGHTRAG_EMBED_MODEL", "nomic-embed-text")
DEFAULT_LLM_MODEL = os.environ.get("LIGHTRAG_LLM_MODEL", "qwen2.5:7b")
DEFAULT_OLLAMA_HOST = os.environ.get("LIGHTRAG_OLLAMA_HOST", "http://localhost:11434")
LIGHTRAG_DIR = get_hermes_home() / "lightrag"
SKILLS_DIR = get_hermes_home() / "skills"
# ---------------------------------------------------------------------------
# Ollama helpers
# ---------------------------------------------------------------------------
def _ollama_available() -> bool:
"""Check if Ollama server is reachable."""
try:
import urllib.request
req = urllib.request.Request(f"{DEFAULT_OLLAMA_HOST}/api/tags")
with urllib.request.urlopen(req, timeout=3) as resp:
return resp.status == 200
except Exception:
return False
def _has_ollama_model(model_name: str) -> bool:
"""Check if a specific model is pulled in Ollama."""
try:
import urllib.request
req = urllib.request.Request(f"{DEFAULT_OLLAMA_HOST}/api/tags")
with urllib.request.urlopen(req, timeout=3) as resp:
data = json.loads(resp.read())
models = [m["name"] for m in data.get("models", [])]
return any(model_name in m for m in models)
except Exception:
return False
async def _ollama_embedding(texts: list, **kwargs) -> np.ndarray:
"""Call Ollama embeddings API."""
import aiohttp
payload = {
"model": DEFAULT_EMBED_MODEL,
"input": texts,
}
async with aiohttp.ClientSession() as session:
async with session.post(
f"{DEFAULT_OLLAMA_HOST}/api/embed",
json=payload,
timeout=aiohttp.ClientTimeout(total=60),
) as resp:
resp.raise_for_status()
data = await resp.json()
embeddings = data.get("embeddings", [])
if not embeddings:
raise RuntimeError("Ollama returned empty embeddings")
return np.array(embeddings, dtype=np.float32)
async def _ollama_complete(
prompt, system_prompt=None, history_messages=None, **kwargs
) -> str:
"""Call Ollama generate API for LLM completion."""
import aiohttp
messages = []
if system_prompt:
messages.append({"role": "system", "content": system_prompt})
if history_messages:
for msg in history_messages:
role = "user" if msg.get("role") == "user" else "assistant"
messages.append({"role": role, "content": msg.get("content", "")})
messages.append({"role": "user", "content": prompt})
payload = {
"model": DEFAULT_LLM_MODEL,
"messages": messages,
"stream": False,
"options": {"temperature": 0.3, "num_predict": 2048},
}
async with aiohttp.ClientSession() as session:
async with session.post(
f"{DEFAULT_OLLAMA_HOST}/api/chat",
json=payload,
timeout=aiohttp.ClientTimeout(total=120),
) as resp:
resp.raise_for_status()
data = await resp.json()
return data.get("message", {}).get("content", "")
# ---------------------------------------------------------------------------
# LightRAG setup
# ---------------------------------------------------------------------------
_lightrag_instance: Optional[object] = None
def _get_lightrag() -> object:
"""Lazy-initialize LightRAG with Ollama backends."""
global _lightrag_instance
if _lightrag_instance is not None:
return _lightrag_instance
try:
from lightrag import LightRAG, QueryParam
from lightrag.utils import EmbeddingFunc
except ImportError as e:
raise RuntimeError(
"lightrag is not installed. Run: pip install lightrag-hku"
) from e
LIGHTRAG_DIR.mkdir(parents=True, exist_ok=True)
# Wrap Ollama embedding for LightRAG
embed_func = EmbeddingFunc(
embedding_dim=768, # nomic-embed-text dimension
func=_ollama_embedding,
max_token_size=8192,
model_name=DEFAULT_EMBED_MODEL,
)
_lightrag_instance = LightRAG(
working_dir=str(LIGHTRAG_DIR),
embedding_func=embed_func,
llm_model_func=_ollama_complete,
llm_model_name=DEFAULT_LLM_MODEL,
chunk_token_size=1200,
chunk_overlap_token_size=100,
)
return _lightrag_instance
# ---------------------------------------------------------------------------
# Indexing
# ---------------------------------------------------------------------------
def _collect_markdown_files(root: Path) -> List[Path]:
"""Collect all .md files under root, excluding node_modules and .git."""
files = []
if not root.exists():
return files
for path in root.rglob("*.md"):
if any(part.startswith(".") or part == "node_modules" for part in path.parts):
continue
files.append(path)
return sorted(files)
def _read_text_safe(path: Path, limit: int = 500_000) -> str:
"""Read file text with size limit."""
try:
stat = path.stat()
if stat.st_size > limit:
return path.read_text(encoding="utf-8", errors="ignore")[:limit]
return path.read_text(encoding="utf-8", errors="ignore")
except Exception as e:
logger.warning("Failed to read %s: %s", path, e)
return ""
def lightrag_index(directories: Optional[List[str]] = None) -> str:
"""Index markdown files into LightRAG knowledge graph.
Args:
directories: Extra directories to index (in addition to ~/.hermes/skills/).
"""
if not _ollama_available():
return tool_error(
"Ollama is not running. Start it with: ollama serve"
)
if not _has_ollama_model(DEFAULT_EMBED_MODEL):
return tool_error(
f"Embedding model '{DEFAULT_EMBED_MODEL}' not found in Ollama. "
f"Pull it with: ollama pull {DEFAULT_EMBED_MODEL}"
)
if not _has_ollama_model(DEFAULT_LLM_MODEL):
return tool_error(
f"LLM model '{DEFAULT_LLM_MODEL}' not found in Ollama. "
f"Pull it with: ollama pull {DEFAULT_LLM_MODEL}"
)
rag = _get_lightrag()
dirs = [SKILLS_DIR]
if directories:
for d in directories:
p = Path(d).expanduser()
if p.exists():
dirs.append(p)
all_files = []
for d in dirs:
all_files.extend(_collect_markdown_files(d))
if not all_files:
return json.dumps({
"status": "warning",
"message": "No markdown files found to index.",
"directories": [str(d) for d in dirs],
})
# Read and insert files
inserted = 0
errors = 0
for path in all_files:
text = _read_text_safe(path)
if not text.strip():
continue
try:
# LightRAG insert is async; bridge it
asyncio.run(rag.atext(text))
inserted += 1
except Exception as e:
logger.warning("Failed to index %s: %s", path, e)
errors += 1
return json.dumps({
"status": "ok",
"indexed_files": inserted,
"errors": errors,
"total_files": len(all_files),
"storage_dir": str(LIGHTRAG_DIR),
})
# ---------------------------------------------------------------------------
# Query
# ---------------------------------------------------------------------------
def lightrag_query(query: str, mode: str = "hybrid") -> str:
"""Query the LightRAG knowledge graph.
Args:
query: The question or search query.
mode: Search mode — "local" (nearby entities), "global" (graph-wide),
or "hybrid" (both).
"""
if not query or not query.strip():
return tool_error("Query cannot be empty.")
if mode not in {"local", "global", "hybrid"}:
return tool_error("mode must be one of: local, global, hybrid")
if not _ollama_available():
return tool_error(
"Ollama is not running. Start it with: ollama serve"
)
rag = _get_lightrag()
# Check if any data has been indexed
if not LIGHTRAG_DIR.exists() or not any(LIGHTRAG_DIR.iterdir()):
return json.dumps({
"status": "empty",
"message": "LightRAG index is empty. Run lightrag_index() first.",
})
try:
from lightrag import QueryParam
param = QueryParam(mode=mode)
result = asyncio.run(rag.aquery(query, param=param))
return json.dumps({
"status": "ok",
"mode": mode,
"query": query,
"answer": result,
})
except Exception as e:
logger.exception("LightRAG query failed")
return tool_error(f"Query failed: {e}")
# ---------------------------------------------------------------------------
# Tool schemas
# ---------------------------------------------------------------------------
LIGHTRAG_QUERY_SCHEMA = {
"name": "lightrag_query",
"description": (
"Graph-based knowledge retrieval over indexed skills and documentation.\n\n"
"Use this when the user asks about: conventions, workflows, tool usage, "
"project-specific practices, or anything that might be documented in skills.\n\n"
"Modes:\n"
"- local: fast, searches nearby entities in the graph\n"
"- global: thorough, reasons across the entire knowledge graph\n"
"- hybrid: balanced, combines local and global (recommended)\n\n"
"If the index is empty, the tool will report that and you should "
"call lightrag_index() to populate it."
),
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The question or search query.",
},
"mode": {
"type": "string",
"enum": ["local", "global", "hybrid"],
"description": "Search mode. hybrid is recommended.",
},
},
"required": ["query"],
},
}
LIGHTRAG_INDEX_SCHEMA = {
"name": "lightrag_index",
"description": (
"(Re-)build the LightRAG knowledge graph from skill files and docs.\n\n"
"By default indexes ~/.hermes/skills/. Pass extra directories if needed.\n"
"This is a one-time or occasional operation; queries work against the "
"existing index until you re-index."
),
"parameters": {
"type": "object",
"properties": {
"directories": {
"type": "array",
"items": {"type": "string"},
"description": "Optional extra directories to index (in addition to ~/.hermes/skills/).",
},
},
},
}
# ---------------------------------------------------------------------------
# Availability check
# ---------------------------------------------------------------------------
def check_lightrag_requirements() -> bool:
"""Return True if LightRAG and Ollama appear to be available."""
try:
import lightrag # noqa: F401
except ImportError:
return False
return _ollama_available()
# ---------------------------------------------------------------------------
# Registry
# ---------------------------------------------------------------------------
registry.register(
name="lightrag_query",
toolset="rag",
schema=LIGHTRAG_QUERY_SCHEMA,
handler=lambda args, **kw: lightrag_query(
query=args.get("query", ""),
mode=args.get("mode", "hybrid"),
),
check_fn=check_lightrag_requirements,
emoji="🔎",
)
registry.register(
name="lightrag_index",
toolset="rag",
schema=LIGHTRAG_INDEX_SCHEMA,
handler=lambda args, **kw: lightrag_index(
directories=args.get("directories"),
),
check_fn=check_lightrag_requirements,
emoji="📚",
)

View File

@@ -0,0 +1,106 @@
#!/usr/bin/env python3
"""
Local Inference Bridge — Fast-path for low-entropy LLM tasks.
Detects local Ollama/llama-cpp instances and uses them for 'Auxiliary' tasks
(summarization, extraction, simple verification) to reduce cloud dependency.
"""
import json
import logging
import os
import requests
from typing import Dict, List, Optional, Any
from tools.registry import registry, tool_error, tool_result
logger = logging.getLogger(__name__)
LOCAL_INFERENCE_SCHEMA = {
"name": "local_inference",
"description": "Execute a task using a local inference engine (Ollama/llama-cpp) if available. Ideal for simple summarization, text cleanup, or data extraction where cloud-grade intelligence is overkill.",
"parameters": {
"type": "object",
"properties": {
"prompt": {"type": "string", "description": "The task prompt."},
"system": {"type": "string", "description": "Optional system instruction."},
"engine": {"type": "string", "enum": ["auto", "ollama", "llama-cpp"], "default": "auto"}
},
"required": ["prompt"]
}
}
def detect_local_engine() -> Optional[Dict[str, str]]:
"""Detect presence of local inference engines."""
# 1. Check Ollama (default port 11434)
try:
res = requests.get("http://localhost:11434/api/tags", timeout=1)
if res.status_code == 200:
return {"type": "ollama", "url": "http://localhost:11434"}
except:
pass
# 2. Check llama-cpp-python (commonly on 8000 or 8080)
for port in [8000, 8080]:
try:
res = requests.get(f"http://localhost:{port}/v1/models", timeout=1)
if res.status_code == 200:
return {"type": "llama-cpp", "url": f"http://localhost:{port}"}
except:
pass
return None
def run_local_task(prompt: str, system: str = None, engine: str = "auto"):
"""Execute inference on a detected local engine."""
info = detect_local_engine()
if not info:
return tool_error("No local inference engine (Ollama or llama-cpp) detected on localhost.")
try:
if info["type"] == "ollama":
# Select first available model or default to gemma
models = requests.get(f"{info['url']}/api/tags").json().get("models", [])
model_name = models[0]["name"] if models else "gemma"
payload = {
"model": model_name,
"prompt": prompt,
"stream": False
}
if system: payload["system"] = system
res = requests.post(f"{info['url']}/api/generate", json=payload, timeout=60)
result = res.json().get("response", "")
return tool_result(engine="Ollama", model=model_name, response=result)
elif info["type"] == "llama-cpp":
payload = {
"model": "local-model",
"messages": [
{"role": "system", "content": system or "You are a helpful assistant."},
{"role": "user", "content": prompt}
]
}
res = requests.post(f"{info['url']}/v1/chat/completions", json=payload, timeout=60)
result = res.json()["choices"][0]["message"]["content"]
return tool_result(engine="llama-cpp", response=result)
except Exception as e:
return tool_error(f"Local inference failed: {str(e)}")
def _handle_local_inference(args, **kwargs):
return run_local_task(
prompt=args.get("prompt"),
system=args.get("system"),
engine=args.get("engine", "auto")
)
registry.register(
name="local_inference",
toolset="inference",
schema=LOCAL_INFERENCE_SCHEMA,
handler=_handle_local_inference,
emoji="🏠"
)

View File

@@ -0,0 +1,86 @@
#!/usr/bin/env python3
"""
Sovereign Scavenger — Autonomous Backlog Grooming.
Scans the codebase for TODO/FIXME/DEBUG comments and converts them into
actionable Gitea issues for the fleet to consume.
"""
import os
import re
import logging
from typing import List, Dict, Any
from tools.registry import registry, tool_error, tool_result
logger = logging.getLogger(__name__)
SCAVENGER_SCHEMA = {
"name": "sovereign_scavenger",
"description": "Scans the current directory for TODO, FIXME, or DEBUG comments. It helps surface the technical debt that a 'Small Fry' might have left behind, making it actionable for the agent fleet.",
"parameters": {
"type": "object",
"properties": {
"path": {"type": "string", "description": "Path to scan (defaults to current directory).", "default": "."},
"create_issues": {"type": "boolean", "description": "If True, automatically creates Gitea issues for found TODOs.", "default": False}
}
}
}
def find_todos(root_path: str):
"""Scan files for TODO patterns."""
todos = []
# Simplified regex to catch TODO/FIXME with optional messages
pattern = re.compile(r'#.*(TODO|FIXME|DEBUG|XXX)[:s]*(.*)', re.IGNORECASE)
for root, dirs, files in os.walk(root_path):
# Skip hidden and annoying dirs
dirs[:] = [d for d in dirs if not d.startswith('.') and d not in ['node_modules', 'dist', '__pycache__']]
for file in files:
if not file.endswith(('.py', '.ts', '.js', '.md', '.txt')):
continue
filepath = os.path.join(root, file)
try:
with open(filepath, 'r', encoding='utf-8') as f:
for i, line in enumerate(f, 1):
match = pattern.search(line)
if match:
todos.append({
"type": match.group(1).upper(),
"message": match.group(2).strip() or "No description provided.",
"file": filepath,
"line": i
})
except Exception as e:
logger.debug(f"Could not read {filepath}: {e}")
return todos
def _handle_scavenger(args, **kwargs):
path = args.get("path", ".")
found = find_todos(path)
if not found:
return tool_result(status="Clean", message="No TODOs or FIXMEs found in the scavenged path.")
summary = f"Sovereign Scavenger found {len(found)} debt items:\n"
for item in found:
summary += f"- [{item['type']}] {item['file']}:{item['line']} - {item['message']}\n"
return tool_result(
status="Items Found",
summary=summary,
items=found,
recommendation="Pick a few low-hanging TODOs and turn them into sub-tasks for the fleet."
)
registry.register(
name="sovereign_scavenger",
toolset="dispatch",
schema=SCAVENGER_SCHEMA,
handler=_handle_scavenger,
emoji="🧹"
)

109
tools/static_analyzer.py Normal file
View File

@@ -0,0 +1,109 @@
#!/usr/bin/env python3
"""
GOFAI Static Analyzer — Deterministic risk assessment for autonomous code.
Detects high-risk patterns like infinite loops, resource exhaustion,
and circular dependencies using AST analysis.
"""
import ast
import logging
import os
from typing import List, Dict, Any
from tools.registry import registry, tool_error, tool_result
logger = logging.getLogger(__name__)
STATIC_ANALYZE_SCHEMA = {
"name": "static_analyze",
"description": "Perform an advanced GOFAI static analysis of code. Detects infinite loops, potential memory leaks (unbounded collections), and circular dependency risks without using an LLM. Use this to ensure your code is 'Fleet-Safe'.",
"parameters": {
"type": "object",
"properties": {
"path": {"type": "string", "description": "Path to the file to analyze."}
},
"required": ["path"]
}
}
class RiskAnalyzer(ast.NodeVisitor):
def __init__(self):
self.risks = []
self.current_function = None
def visit_FunctionDef(self, node):
old_func = self.current_function
self.current_function = node.name
self.generic_visit(node)
self.current_function = old_func
def visit_While(self, node):
# Check for 'while True' or 'while 1'
if isinstance(node.test, ast.Constant) and node.test.value is True:
# Look for 'break' or 'return' inside the loop
has_exit = any(isinstance(child, (ast.Break, ast.Return)) for child in ast.walk(node))
if not has_exit:
self.risks.append({
"type": "Infinite Loop Risk",
"location": f"{self.current_function or 'module'} (line {node.lineno})",
"severity": "HIGH",
"message": "Potential infinite loop: 'while True' found without clear break/return path."
})
self.generic_visit(node)
def visit_For(self, node):
# Basic check for modifying the sequence being iterated (common error)
if isinstance(node.target, ast.Name):
for child in ast.walk(node.body):
if isinstance(child, ast.Call) and isinstance(child.func, ast.Attribute):
if child.func.attr in ['append', 'extend', 'pop', 'remove']:
if isinstance(child.func.value, ast.Name) and child.func.value.id == node.target.id:
self.risks.append({
"type": "Mutation Risk",
"location": f"{self.current_function or 'module'} (line {node.lineno})",
"severity": "MEDIUM",
"message": f"Loop modifies iterator variable '{node.target.id}'."
})
self.generic_visit(node)
def run_analysis(path: str):
"""Run the static analysis pipeline."""
try:
source = open(path, "r").read()
tree = ast.parse(source)
analyzer = RiskAnalyzer()
analyzer.visit(tree)
if not analyzer.risks:
return tool_result(
status="Verified Safe",
message="No high-risk GOFAI patterns detected. Code appears compliant with Fleet execution safety standards."
)
summary = "GOFAI RISK ASSESSMENT REPORT:\n"
for risk in analyzer.risks:
summary += f"- [{risk['severity']}] {risk['type']} in {risk['location']}: {risk['message']}\n"
return tool_result(
status="Risk Detected",
summary=summary,
risks=analyzer.risks,
recommendation="Address the identified risks before deploying this code to the fleet."
)
except Exception as e:
return tool_error(f"Static analysis failed: {str(e)}")
def _handle_static_analyze(args, **kwargs):
return run_analysis(args.get("path"))
registry.register(
name="static_analyze",
toolset="qa",
schema=STATIC_ANALYZE_SCHEMA,
handler=_handle_static_analyze,
emoji="🛡️"
)

167
tools/symbolic_verify.py Normal file
View File

@@ -0,0 +1,167 @@
#!/usr/bin/env python3
"""
Symbolic Verify (GOFAI) Tool
Leverages Python's Abstract Syntax Tree (AST) to perform deterministic
code audits without LLM inference. Detects 'LLM-isms' like undefined
variables, shadow variables, and scoping errors.
"""
import ast
import json
import logging
import os
from typing import Dict, List, Set, Any
from tools.registry import registry, tool_error, tool_result
logger = logging.getLogger(__name__)
SYMBOLIC_VERIFY_SCHEMA = {
"name": "symbolic_verify",
"description": "Perform a deterministic GOFAI audit of code using AST analysis. Identifies undefined variables, unused imports, and scoping issues without using an LLM. Use this to verify your changes are syntactically and semantically sound before submission.",
"parameters": {
"type": "object",
"properties": {
"path": {"type": "string", "description": "Path to the Python file to audit."},
"check_level": {
"type": "string",
"enum": ["syntax", "scope", "all"],
"default": "all",
"description": "Level of analysis to perform."
}
},
"required": ["path"]
}
}
class ScopeAnalyzer(ast.NodeVisitor):
def __init__(self):
self.defined_vars = set()
self.used_vars = set()
self.undefined_references = []
self.scopes = [{}] # Stack of symbol tables
self.builtins = set(dir(__builtins__))
def visit_Import(self, node):
for alias in node.names:
name = alias.asname or alias.name
self.scopes[-1][name] = "import"
self.generic_visit(node)
def visit_ImportFrom(self, node):
for alias in node.names:
name = alias.asname or alias.name
self.scopes[-1][name] = "import"
self.generic_visit(node)
def visit_Name(self, node):
if isinstance(node.ctx, ast.Store):
self.scopes[-1][node.id] = "defined"
elif isinstance(node.ctx, ast.Load):
# Check if defined in any scope level or builtins
is_defined = any(node.id in scope for scope in self.scopes) or node.id in self.builtins
if not is_defined:
# Store potential undefined
self.undefined_references.append({
"name": node.id,
"lineno": node.lineno,
"col": node.col_offset
})
self.generic_visit(node)
def visit_FunctionDef(self, node):
self.scopes[-1][node.name] = "function"
# New scope for arguments and body
new_scope = {}
for arg in node.args.args:
new_scope[arg.arg] = "parameter"
self.scopes.append(new_scope)
self.generic_visit(node)
self.scopes.pop()
def visit_ClassDef(self, node):
self.scopes[-1][node.name] = "class"
self.scopes.append({})
self.generic_visit(node)
self.scopes.pop()
def audit_file(path: str, check_level: str = "all"):
"""Audit a Python file for common semantic errors."""
if not path.endswith(".py"):
return tool_error("Symbolic verification only supports Python (.py) files.")
try:
if not os.path.exists(path):
return tool_error(f"File not found: {path}")
source = open(path, "r").read()
# 1. Syntax Check
try:
tree = ast.parse(source)
except SyntaxError as e:
return tool_result(
status="Critical Failure",
errors=[{
"type": "SyntaxError",
"message": e.msg,
"lineno": e.lineno,
"offset": e.offset
}],
recommendation="Fix the syntax error immediately. The file cannot be executed."
)
if check_level == "syntax":
return tool_result(status="Clean", message="Syntax is valid.")
# 2. Scope & Reference Search
analyzer = ScopeAnalyzer()
analyzer.visit(tree)
# Filter out common false positives (e.g. late imports or dynamic names)
# For a truly robust GOFAI we'd do more, but this is 'secret sauce' level
undefined = []
seen = set()
for ref in analyzer.undefined_references:
key = (ref["name"], ref["lineno"])
if key not in seen:
undefined.append(ref)
seen.add(key)
if not undefined:
return tool_result(
status="Healthy",
message="Deterministic check passed. No undefined variables detected in analyzed scopes.",
file_stats={
"chars": len(source),
"nodes": len(list(ast.walk(tree)))
}
)
report = "GOFAI AUDIT DETECTED SEMANTIC ISSUES:\n"
for u in undefined:
report += f"- Undefined Variable: '{u['name']}' at line {u['lineno']}\n"
return tool_result(
status="Warning",
summary=report,
undefined_variables=undefined,
recommendation="Review the undefined variables. Ensure they are imported or defined before use."
)
except Exception as e:
return tool_error(f"Symbolic audit failed: {str(e)}")
def _handle_symbolic_verify(args, **kwargs):
return audit_file(args.get("path"), args.get("check_level", "all"))
registry.register(
name="symbolic_verify",
toolset="qa",
schema=SYMBOLIC_VERIFY_SCHEMA,
handler=_handle_symbolic_verify,
emoji="🔬"
)

View File

@@ -167,6 +167,12 @@ TOOLSETS = {
"tools": ["memory"],
"includes": []
},
"rag": {
"description": "Graph-based knowledge retrieval over indexed skills and docs (LightRAG)",
"tools": ["lightrag_query", "lightrag_index"],
"includes": []
},
"session_search": {
"description": "Search and recall past conversations with summarization",