Source code for yoker.tools.list
"""List tool implementation for Yoker.
Provides the ListTool for listing directory contents with configurable
depth, entry limits, and glob pattern filtering.
"""
import fnmatch
from pathlib import Path
from typing import TYPE_CHECKING, Any
from yoker.logging import get_logger
from .base import Tool, ToolResult
if TYPE_CHECKING:
from yoker.tools.guardrails import Guardrail
log = get_logger(__name__)
[docs]
class ListTool(Tool):
"""Tool for listing directory contents.
Lists files and directories with optional recursion depth control,
entry limits, and glob pattern filtering. Returns a tree-formatted
string for LLM consumption.
When a guardrail is provided, validates parameters before listing.
"""
DEFAULT_MAX_DEPTH: int = 1
DEFAULT_MAX_ENTRIES: int = 1000
ABSOLUTE_MAX_DEPTH: int = 10
ABSOLUTE_MAX_ENTRIES: int = 5000
def __init__(self, guardrail: "Guardrail | None" = None) -> None:
"""Initialize ListTool with optional guardrail.
Args:
guardrail: Optional guardrail for parameter validation.
"""
super().__init__(guardrail=guardrail)
@property
def name(self) -> str:
return "list"
@property
def description(self) -> str:
return (
"List files and directories. "
"Supports optional recursion (max_depth), "
"entry limits (max_entries), and glob pattern filtering."
)
[docs]
def get_schema(self) -> dict[str, Any]:
"""Return Ollama-compatible schema for the list tool."""
return {
"type": "function",
"function": {
"name": self.name,
"description": self.description,
"parameters": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Path to the directory to list",
},
"max_depth": {
"type": "integer",
"description": (
"Maximum recursion depth (1 = immediate children only). "
f"Defaults to {self.DEFAULT_MAX_DEPTH}."
),
"minimum": 0,
"maximum": self.ABSOLUTE_MAX_DEPTH,
},
"max_entries": {
"type": "integer",
"description": (
f"Maximum total entries to return. Defaults to {self.DEFAULT_MAX_ENTRIES}."
),
"minimum": 1,
"maximum": self.ABSOLUTE_MAX_ENTRIES,
},
"pattern": {
"type": "string",
"description": (
'Optional glob pattern to filter entries (e.g., "*.py" for Python files)'
),
},
},
"required": ["path"],
},
},
}
[docs]
async def execute(self, **kwargs: Any) -> ToolResult:
"""List directory contents with optional filtering and limits.
Args:
**kwargs: Must contain 'path'. May contain 'max_depth',
'max_entries', and 'pattern'.
Returns:
ToolResult with formatted directory listing or error message.
"""
path_str = kwargs.get("path", "")
if not path_str:
return ToolResult(success=False, result="", error="Missing required parameter: path")
# Defense-in-depth: validate via guardrail if provided
if self._guardrail is not None:
validation = self._guardrail.validate(self.name, kwargs)
if not validation.valid:
log.info(
"list_guardrail_blocked",
path=path_str,
reason=validation.reason,
)
return ToolResult(
success=False,
result="",
error=validation.reason,
)
# Parse and clamp parameters
try:
max_depth = self._clamp(
int(kwargs.get("max_depth", self.DEFAULT_MAX_DEPTH)),
0,
self.ABSOLUTE_MAX_DEPTH,
)
max_entries = self._clamp(
int(kwargs.get("max_entries", self.DEFAULT_MAX_ENTRIES)),
1,
self.ABSOLUTE_MAX_ENTRIES,
)
except (ValueError, TypeError):
return ToolResult(success=False, result="", error="Invalid numeric parameter")
pattern = kwargs.get("pattern", "")
if pattern is None:
pattern = ""
try:
path = Path(path_str)
if not path.exists():
return ToolResult(success=False, result="", error=f"Path not found: {path_str}")
# If path is a file, return it as a single entry
if not path.is_dir():
return ToolResult(
success=True,
result=f"{path.name}\n\n1 entry total (1 file, 0 directories)",
)
# Build tree listing
lines, file_count, dir_count, truncated = self._build_tree(
path, max_depth, max_entries, pattern
)
total = file_count + dir_count
lines.append("")
lines.append(f"{total} entries total ({file_count} files, {dir_count} directories)")
if truncated:
lines.append(f"... ({truncated} more entries truncated, max_entries={max_entries})")
return ToolResult(success=True, result="\n".join(lines))
except PermissionError:
return ToolResult(success=False, result="", error=f"Permission denied: {path_str}")
except Exception as e:
return ToolResult(success=False, result="", error=f"Error listing directory: {e}")
def _clamp(self, value: int, minimum: int, maximum: int) -> int:
"""Clamp a value to a range."""
return max(minimum, min(value, maximum))
def _build_tree(
self,
root: Path,
max_depth: int,
max_entries: int,
pattern: str,
) -> tuple[list[str], int, int, int]:
"""Build tree listing.
Returns:
Tuple of (lines, file_count, dir_count, truncated_count).
"""
lines: list[str] = [str(root).rstrip("/") + "/"]
file_count = 0
dir_count = 0
entry_count = 0
truncated = 0
if max_depth == 0:
return lines, file_count, dir_count, truncated
def walk(current: Path, depth: int, prefix: str = "") -> None:
nonlocal file_count, dir_count, entry_count, truncated
if depth >= max_depth or entry_count >= max_entries:
return
try:
entries = sorted(current.iterdir(), key=lambda p: p.name.lower())
except PermissionError:
lines.append(prefix + "... (permission denied)")
return
if pattern:
entries = [e for e in entries if fnmatch.fnmatch(e.name, pattern)]
for entry in entries:
if entry_count >= max_entries:
truncated += 1
continue
# Do not follow symlinks
if entry.is_symlink():
lines.append(prefix + entry.name)
file_count += 1
entry_count += 1
continue
if entry.is_dir():
lines.append(prefix + entry.name + "/")
dir_count += 1
entry_count += 1
walk(entry, depth + 1, prefix + " ")
else:
lines.append(prefix + entry.name)
file_count += 1
entry_count += 1
walk(root, 0)
return lines, file_count, dir_count, truncated