Source code for execution.config_validator

"""
Script
------
config_validator.py

Path
----
execution/config_validator.py

Purpose
-------
Config Validator: Validate model configuration, load environment files, and manage API key retrieval.

Extracted from WorkflowRunner to separate configuration concerns from execution logic.
Validates coherence of model config, loads .env files, and provides API key management.

Inputs
------
model_config (dict): Model configuration to validate
graph (WorkflowGraph): Workflow graph for schema access
trace_logger (TraceLogger): Logger for warnings
provider (str): Provider name for API key lookup

Outputs
-------
validated (bool): True if config passes validation (raises on failure)
api_key (str|None): API key from config or environment
None (side effects): Logs warnings, loads environment variables

Assumptions
-----------
- Workflow file is valid JSON matching schema
- .env file exists or environment is pre-configured
- API keys are stored in config file or environment variables

Parameters
----------
None (per-workflow via model_config and graph)

Failure Modes
-------------
- Invalid mode/preset combination ConfigurationError
- Budget constraints incoherent ConfigurationError
- Allowlist/blocklist overlap ConfigurationError
- API key not found Return None (model handles error)
- .env file missing Silently ignore

Author: Julen Gamboa <julen.gamboa.ds@gmail.com>

Created
-------
2026-02-22

Last Edited
-----------
2026-02-22
"""

import os
from pathlib import Path
from typing import Optional
from .graph import WorkflowGraph
from .trace import TraceLogger
from utils import ConfigurationError


[docs] class ConfigValidator: """Validate model configuration and manage API key retrieval."""
[docs] def __init__( self, model_config: dict, graph: WorkflowGraph, trace_logger: TraceLogger, ): """ Args: model_config: Model configuration dict to validate graph: WorkflowGraph instance for accessing workflow schema trace_logger: TraceLogger instance for logging warnings """ self.model_config = model_config self.graph = graph self.trace_logger = trace_logger
[docs] @staticmethod def load_env_file() -> None: """Load .env file from repo root to ensure API keys are available.""" # Find repo root by looking for .env starting from current directory repo_root = Path.cwd() env_file = None # Search up to 3 levels for .env for _ in range(3): candidate = repo_root / ".env" if candidate.exists(): env_file = candidate break if repo_root.parent == repo_root: # Reached filesystem root break repo_root = repo_root.parent if not env_file: # .env not found, but that's okay - environment may already be set return # Load .env file try: from dotenv import load_dotenv # Override=True ensures .env values replace environment variables load_dotenv(env_file, override=True) except ImportError: # dotenv not available, try manual loading try: with open(env_file) as f: for line in f: line = line.strip() if line and not line.startswith("#") and "=" in line: key, value = line.split("=", 1) # Always set from .env (override any existing value) os.environ[key] = value.strip('"\'') except Exception: # Silently ignore errors loading .env pass
[docs] def validate_model_config(self) -> None: """Validate model configuration for coherence. Raises: ConfigurationError: If configuration is invalid """ if not self.model_config: return mode = self.model_config.get("mode", "explicit") preset = self.model_config.get("preset") # Validate mode/preset relationship if mode == "preset" and not preset: raise ConfigurationError("mode=preset requires preset field") if preset and preset not in ["minimize_cost", "balanced", "maximize_quality", "local_only"]: raise ConfigurationError(f"Unknown preset: {preset}") # Validate budget constraints budget = self.model_config.get("budget", {}) max_per_task = budget.get("max_per_task_usd") max_workflow = budget.get("max_workflow_usd") if max_per_task and max_workflow and max_per_task > max_workflow: raise ConfigurationError("max_per_task_usd cannot exceed max_workflow_usd") # Validate provider constraints provider_prefs = self.model_config.get("provider_preferences", {}) allowlist = set(provider_prefs.get("allowlist", [])) blocklist = set(provider_prefs.get("blocklist", [])) if allowlist and blocklist and allowlist & blocklist: raise ConfigurationError("allowlist and blocklist cannot have overlapping providers") # Warn if local_only with critical tasks if preset == "local_only": from datetime import datetime nodes = getattr(self.graph, 'nodes', {}) for node_id, node in nodes.items(): complexity = node.get("complexity", "moderate") if complexity in ["complex", "critical"]: self.trace_logger.log({ "timestamp": datetime.now().isoformat(), "type": "warning", "message": f"preset=local_only but found {complexity} task (may be unresolvable)", })
[docs] def get_api_key_for_provider(self, provider: str) -> Optional[str]: """Get API key for provider from config file or environment. Priority: 1. model_config parameter (for workflow-specific keys) 2. ~/.hillstar/provider_registry.json (user config) 3. Environment variable 4. Return None (let model handle error) Args: provider: Provider name (e.g., "anthropic") Returns: API key string or None if not found """ from config import HillstarConfig # Strip "_mcp" suffix if present (anthropic_mcp -> anthropic) base_provider = provider.replace("_mcp", "") # Try model_config first (workflow-specific configuration) if self.model_config: provider_config = self.model_config.get(base_provider) if provider_config: if isinstance(provider_config, dict): api_key = provider_config.get("api_key") if api_key: return api_key # Try system config file second try: config = HillstarConfig() api_key = config.get_provider_key(base_provider) if api_key: return api_key except Exception: pass # Fall back to environment variable env_var_map = { "anthropic": "ANTHROPIC_API_KEY", "openai": "OPENAI_API_KEY", "mistral": "MISTRAL_API_KEY", "google_ai_studio": "GOOGLE_API_KEY", } env_var = env_var_map.get(base_provider) if env_var: api_key = os.getenv(env_var) if api_key: return api_key # Not found return None