Source code for yoker.tools.existence

"""Existence tool implementation for Yoker.

Provides the ExistenceTool for checking if files and folders exist with
guardrail validation, path resolution, and symlink rejection.
"""

import os
from pathlib import Path
from typing import TYPE_CHECKING, Any

from yoker.logging import get_logger
from yoker.tools.base import Tool, ToolResult

if TYPE_CHECKING:
  from yoker.tools.guardrails import Guardrail

log = get_logger(__name__)


[docs] class ExistenceTool(Tool): """Tool for checking file or folder existence. Checks whether a file or directory exists at the given path. When a guardrail is provided, validates parameters before checking. Resolves paths with realpath and rejects symlinks by default. Returns a structured result indicating existence, type (file/directory), and the resolved path for debugging. """ def __init__(self, guardrail: "Guardrail | None" = None) -> None: """Initialize ExistenceTool with optional guardrail. Args: guardrail: Optional guardrail for parameter validation. """ super().__init__(guardrail=guardrail) @property def name(self) -> str: return "existence" @property def description(self) -> str: return "Check if a file or folder exists at the given path"
[docs] def get_schema(self) -> dict[str, Any]: """Return Ollama-compatible schema for the existence tool. Returns: Dict with 'type': 'function' and function metadata. """ return { "type": "function", "function": { "name": self.name, "description": self.description, "parameters": { "type": "object", "properties": { "path": { "type": "string", "description": "Path to check for existence", }, }, "required": ["path"], }, }, }
[docs] async def execute(self, **kwargs: Any) -> ToolResult: """Check if a file or folder exists. Steps: 1. Validate parameters via guardrail if provided. 2. Validate path parameter is a non-empty string. 3. Reject symlinks before resolving. 4. Resolve the path with os.path.realpath(). 5. Check existence and type (file or directory). 6. Return structured result with boolean existence flag. Args: **kwargs: Must contain 'path' key with path to check. Returns: ToolResult with existence check result or error message. """ path_str = kwargs.get("path", "") # Defense-in-depth: validate via guardrail if provided if self._guardrail is not None: validation = self._guardrail.validate(self.name, kwargs) if not validation.valid: log.info( "existence_guardrail_blocked", path=path_str, reason=validation.reason, ) return ToolResult( success=False, result="", error=validation.reason, ) # Validate path parameter type if not isinstance(path_str, str): log.warning("existence_invalid_path_type", path_type=type(path_str).__name__) return ToolResult( success=False, result="", error="Invalid path parameter", ) # Validate path is not empty if not path_str.strip(): log.warning("existence_empty_path") return ToolResult( success=False, result="", error="Parameter 'path' cannot be empty", ) # Reject symlinks before resolving to prevent traversal via symlinks original_path = Path(path_str) if original_path.is_symlink(): log.warning("existence_symlink_rejected", path=path_str) return ToolResult( success=False, result="", error="Path not accessible", ) # Resolve the path to normalize try: resolved = Path(os.path.realpath(path_str)) except (OSError, ValueError): log.warning("existence_invalid_path", path=path_str) return ToolResult( success=False, result="", error="Invalid path", ) # Check existence and type try: if resolved.exists(): if resolved.is_file(): path_type = "file" elif resolved.is_dir(): path_type = "directory" else: # Could be a socket, device, etc. path_type = "other" log.info( "existence_check_success", path=str(resolved), exists=True, type=path_type, ) return ToolResult( success=True, result={ "exists": True, "type": path_type, "path": str(resolved), }, ) else: log.info( "existence_check_success", path=str(resolved), exists=False, type=None, ) return ToolResult( success=True, result={ "exists": False, "type": None, "path": str(resolved), }, ) except PermissionError: log.warning("existence_permission_denied", path=str(resolved)) return ToolResult( success=False, result="", error="Path check failed", ) except OSError as e: log.error("existence_os_error", path=str(resolved), error=str(e)) return ToolResult( success=False, result="", error="Path check failed", )