This repository has been archived on 2026-03-24. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
Timmy-time-dashboard/src/mcp/discovery.py
Alexander Payne 56437751d3 Phase 4: Tool Registry Auto-Discovery
- @mcp_tool decorator for marking functions as tools
- ToolDiscovery class for introspecting modules and packages
- Automatic JSON schema generation from type hints
- AST-based discovery for files (without importing)
- Auto-bootstrap on startup (packages=['tools'] by default)
- Support for tags, categories, and metadata
- Updated registry with register_tool() convenience method
- Environment variable MCP_AUTO_BOOTSTRAP to disable
- 39 tests with proper isolation and cleanup

Files Added:
- src/mcp/discovery.py: Tool discovery and introspection
- src/mcp/bootstrap.py: Auto-bootstrap functionality
- tests/test_mcp_discovery.py: 26 tests
- tests/test_mcp_bootstrap.py: 13 tests

Files Modified:
- src/mcp/registry.py: Added tags, source_module, auto_discovered fields
- src/mcp/__init__.py: Export discovery and bootstrap modules
- src/dashboard/app.py: Auto-bootstrap on startup
2026-02-25 19:59:42 -05:00

442 lines
15 KiB
Python

"""MCP Tool Auto-Discovery — Introspect Python modules to find tools.
Automatically discovers functions marked with @mcp_tool decorator
and registers them with the MCP registry. Generates JSON schemas
from type hints.
"""
import ast
import importlib
import inspect
import logging
import pkgutil
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Callable, Optional, get_type_hints
from .registry import ToolRegistry, tool_registry
logger = logging.getLogger(__name__)
# Decorator to mark functions as MCP tools
def mcp_tool(
name: Optional[str] = None,
description: Optional[str] = None,
category: str = "general",
tags: Optional[list[str]] = None,
):
"""Decorator to mark a function as an MCP tool.
Args:
name: Tool name (defaults to function name)
description: Tool description (defaults to docstring)
category: Tool category for organization
tags: Additional tags for filtering
Example:
@mcp_tool(name="weather", category="external")
def get_weather(city: str) -> dict:
'''Get weather for a city.'''
...
"""
def decorator(func: Callable) -> Callable:
func._mcp_tool = True
func._mcp_name = name or func.__name__
func._mcp_description = description or (func.__doc__ or "").strip()
func._mcp_category = category
func._mcp_tags = tags or []
return func
return decorator
@dataclass
class DiscoveredTool:
"""A tool discovered via introspection."""
name: str
description: str
function: Callable
module: str
category: str
tags: list[str]
parameters_schema: dict[str, Any]
returns_schema: dict[str, Any]
source_file: Optional[str] = None
line_number: int = 0
class ToolDiscovery:
"""Discovers and registers MCP tools from Python modules.
Usage:
discovery = ToolDiscovery()
# Discover from a module
tools = discovery.discover_module("tools.git")
# Auto-register with registry
discovery.auto_register("tools")
# Discover from all installed packages
tools = discovery.discover_all_packages()
"""
def __init__(self, registry: Optional[ToolRegistry] = None) -> None:
self.registry = registry or tool_registry
self._discovered: list[DiscoveredTool] = []
def discover_module(self, module_name: str) -> list[DiscoveredTool]:
"""Discover all MCP tools in a module.
Args:
module_name: Dotted path to module (e.g., "tools.git")
Returns:
List of discovered tools
"""
discovered = []
try:
module = importlib.import_module(module_name)
except ImportError as exc:
logger.warning("Failed to import module %s: %s", module_name, exc)
return discovered
# Get module file path for source location
module_file = getattr(module, "__file__", None)
# Iterate through module members
for name, obj in inspect.getmembers(module):
# Skip private and non-callable
if name.startswith("_") or not callable(obj):
continue
# Check if marked as MCP tool
if not getattr(obj, "_mcp_tool", False):
continue
# Get source location
try:
source_file = inspect.getfile(obj)
line_number = inspect.getsourcelines(obj)[1]
except (OSError, TypeError):
source_file = module_file
line_number = 0
# Build schemas from type hints
try:
sig = inspect.signature(obj)
parameters_schema = self._build_parameters_schema(sig)
returns_schema = self._build_returns_schema(sig, obj)
except Exception as exc:
logger.warning("Failed to build schema for %s: %s", name, exc)
parameters_schema = {"type": "object", "properties": {}}
returns_schema = {}
tool = DiscoveredTool(
name=getattr(obj, "_mcp_name", name),
description=getattr(obj, "_mcp_description", obj.__doc__ or ""),
function=obj,
module=module_name,
category=getattr(obj, "_mcp_category", "general"),
tags=getattr(obj, "_mcp_tags", []),
parameters_schema=parameters_schema,
returns_schema=returns_schema,
source_file=source_file,
line_number=line_number,
)
discovered.append(tool)
logger.debug("Discovered tool: %s from %s", tool.name, module_name)
self._discovered.extend(discovered)
logger.info("Discovered %d tools from module %s", len(discovered), module_name)
return discovered
def discover_package(self, package_name: str, recursive: bool = True) -> list[DiscoveredTool]:
"""Discover tools from all modules in a package.
Args:
package_name: Package name (e.g., "tools")
recursive: Whether to search subpackages
Returns:
List of discovered tools
"""
discovered = []
try:
package = importlib.import_module(package_name)
except ImportError as exc:
logger.warning("Failed to import package %s: %s", package_name, exc)
return discovered
package_path = getattr(package, "__path__", [])
if not package_path:
# Not a package, treat as module
return self.discover_module(package_name)
# Walk package modules
for _, name, is_pkg in pkgutil.iter_modules(package_path, prefix=f"{package_name}."):
if is_pkg and recursive:
discovered.extend(self.discover_package(name, recursive=True))
else:
discovered.extend(self.discover_module(name))
return discovered
def discover_file(self, file_path: Path) -> list[DiscoveredTool]:
"""Discover tools from a Python file.
Args:
file_path: Path to Python file
Returns:
List of discovered tools
"""
discovered = []
try:
source = file_path.read_text()
tree = ast.parse(source)
except Exception as exc:
logger.warning("Failed to parse %s: %s", file_path, exc)
return discovered
# Find all decorated functions
for node in ast.walk(tree):
if not isinstance(node, ast.FunctionDef):
continue
# Check for @mcp_tool decorator
is_tool = False
tool_name = node.name
tool_description = ast.get_docstring(node) or ""
tool_category = "general"
tool_tags: list[str] = []
for decorator in node.decorator_list:
if isinstance(decorator, ast.Call):
if isinstance(decorator.func, ast.Name) and decorator.func.id == "mcp_tool":
is_tool = True
# Extract decorator arguments
for kw in decorator.keywords:
if kw.arg == "name" and isinstance(kw.value, ast.Constant):
tool_name = kw.value.value
elif kw.arg == "description" and isinstance(kw.value, ast.Constant):
tool_description = kw.value.value
elif kw.arg == "category" and isinstance(kw.value, ast.Constant):
tool_category = kw.value.value
elif kw.arg == "tags" and isinstance(kw.value, ast.List):
tool_tags = [
elt.value for elt in kw.value.elts
if isinstance(elt, ast.Constant)
]
elif isinstance(decorator, ast.Name) and decorator.id == "mcp_tool":
is_tool = True
if not is_tool:
continue
# Build parameter schema from AST
parameters_schema = self._build_schema_from_ast(node)
# We can't get the actual function without importing
# So create a placeholder that will be resolved later
tool = DiscoveredTool(
name=tool_name,
description=tool_description,
function=None, # Will be resolved when registered
module=str(file_path),
category=tool_category,
tags=tool_tags,
parameters_schema=parameters_schema,
returns_schema={"type": "object"},
source_file=str(file_path),
line_number=node.lineno,
)
discovered.append(tool)
self._discovered.extend(discovered)
logger.info("Discovered %d tools from file %s", len(discovered), file_path)
return discovered
def auto_register(self, package_name: str = "tools") -> list[str]:
"""Automatically discover and register tools.
Args:
package_name: Package to scan for tools
Returns:
List of registered tool names
"""
discovered = self.discover_package(package_name)
registered = []
for tool in discovered:
if tool.function is None:
logger.warning("Skipping %s: no function resolved", tool.name)
continue
try:
self.registry.register_tool(
name=tool.name,
function=tool.function,
description=tool.description,
category=tool.category,
tags=tool.tags,
)
registered.append(tool.name)
logger.debug("Registered tool: %s", tool.name)
except Exception as exc:
logger.error("Failed to register %s: %s", tool.name, exc)
logger.info("Auto-registered %d/%d tools", len(registered), len(discovered))
return registered
def _build_parameters_schema(self, sig: inspect.Signature) -> dict[str, Any]:
"""Build JSON schema for function parameters."""
properties = {}
required = []
for name, param in sig.parameters.items():
if param.kind in (param.VAR_POSITIONAL, param.VAR_KEYWORD):
continue
schema = self._type_to_schema(param.annotation)
if param.default is param.empty:
required.append(name)
else:
schema["default"] = param.default
properties[name] = schema
return {
"type": "object",
"properties": properties,
"required": required,
}
def _build_returns_schema(
self, sig: inspect.Signature, func: Callable
) -> dict[str, Any]:
"""Build JSON schema for return type."""
return_annotation = sig.return_annotation
if return_annotation is sig.empty:
return {"type": "object"}
return self._type_to_schema(return_annotation)
def _build_schema_from_ast(self, node: ast.FunctionDef) -> dict[str, Any]:
"""Build parameter schema from AST node."""
properties = {}
required = []
# Get defaults (reversed, since they're at the end)
defaults = [None] * (len(node.args.args) - len(node.args.defaults)) + list(node.args.defaults)
for arg, default in zip(node.args.args, defaults):
arg_name = arg.arg
arg_type = "string" # Default
# Try to get type from annotation
if arg.annotation:
if isinstance(arg.annotation, ast.Name):
arg_type = self._ast_type_to_json_type(arg.annotation.id)
elif isinstance(arg.annotation, ast.Constant):
arg_type = self._ast_type_to_json_type(str(arg.annotation.value))
schema = {"type": arg_type}
if default is None:
required.append(arg_name)
properties[arg_name] = schema
return {
"type": "object",
"properties": properties,
"required": required,
}
def _type_to_schema(self, annotation: Any) -> dict[str, Any]:
"""Convert Python type annotation to JSON schema."""
if annotation is inspect.Parameter.empty:
return {"type": "string"}
origin = getattr(annotation, "__origin__", None)
args = getattr(annotation, "__args__", ())
# Handle Optional[T] = Union[T, None]
if origin is not None:
if str(origin) == "typing.Union" and type(None) in args:
# Optional type
non_none_args = [a for a in args if a is not type(None)]
if len(non_none_args) == 1:
schema = self._type_to_schema(non_none_args[0])
return schema
return {"type": "object"}
# Handle List[T], Dict[K,V]
if origin in (list, tuple):
items_schema = {"type": "object"}
if args:
items_schema = self._type_to_schema(args[0])
return {"type": "array", "items": items_schema}
if origin is dict:
return {"type": "object"}
# Handle basic types
if annotation in (str,):
return {"type": "string"}
elif annotation in (int, float):
return {"type": "number"}
elif annotation in (bool,):
return {"type": "boolean"}
elif annotation in (list, tuple):
return {"type": "array"}
elif annotation in (dict,):
return {"type": "object"}
return {"type": "object"}
def _ast_type_to_json_type(self, type_name: str) -> str:
"""Convert AST type name to JSON schema type."""
type_map = {
"str": "string",
"int": "number",
"float": "number",
"bool": "boolean",
"list": "array",
"dict": "object",
"List": "array",
"Dict": "object",
"Optional": "object",
"Any": "object",
}
return type_map.get(type_name, "object")
def get_discovered(self) -> list[DiscoveredTool]:
"""Get all discovered tools."""
return list(self._discovered)
def clear(self) -> None:
"""Clear discovered tools cache."""
self._discovered.clear()
# Module-level singleton
discovery: Optional[ToolDiscovery] = None
def get_discovery(registry: Optional[ToolRegistry] = None) -> ToolDiscovery:
"""Get or create the tool discovery singleton."""
global discovery
if discovery is None:
discovery = ToolDiscovery(registry=registry)
return discovery