Source code for workflows.validator

"""
Script
------
validator.py

Path
----
python/hillstar/validator.py

Purpose
-------
Workflow validation: Check workflows against schema, registry, and constraints.

Validates:
- JSON schema compliance
- Provider registry integration
- Provider/model availability
- Model configuration coherence
- Budget constraints
- Graph connectivity
- Compliance requirements

Inputs
------
workflow (dict): Workflow JSON
config: HillstarConfig with ProviderRegistry
registry: ProviderRegistry instance

Outputs
-------
(valid: bool, errors: List[str])

Assumptions
-----------
- Workflow is valid JSON
- ProviderRegistry is properly initialized

Author: Julen Gamboa <julen.gamboa.ds@gmail.com>

Created
-------
2026-02-07

Last Edited
-----------
2026-02-14
"""

import json
import os
from typing import Any, Optional, Tuple
from importlib.resources import files

from config.provider_registry import ProviderRegistry


[docs] class WorkflowValidator: """Validate Hillstar workflows against schema, registry, and constraints.""" @staticmethod def _get_schema_path(): """Get workflow schema path using importlib (works in installed packages).""" try: # Python 3.9+ with importlib.resources schema_file = files('hillstar.schemas').joinpath('workflow-schema.json') return str(schema_file) except Exception: # Fallback for dev environments return os.path.join( os.path.dirname(__file__), "../../schemas/workflow-schema.json" ) SCHEMA_PATH = None # Set dynamically in load_schema()
[docs] def __init__(self, registry: Optional[ProviderRegistry] = None): """Initialize validator with optional registry.""" self.registry = registry or ProviderRegistry()
[docs] def load_schema(self) -> dict[str, Any]: """Load the workflow schema (from installed package or dev environment).""" schema_path = self._get_schema_path() if not os.path.exists(schema_path): raise IOError(f"Schema not found: {schema_path}\n" f"Ensure python/hillstar/schemas/workflow-schema.json exists") with open(schema_path) as f: return json.load(f)
# ==================== JSON Schema Compliance ====================
[docs] def validate_schema(self, workflow: dict[str, Any]) -> Tuple[bool, list[str]]: """ Validate workflow against JSON schema. Args: workflow: Workflow dictionary Returns: (valid: bool, errors: List[str]) """ errors = [] # Required top-level fields required_fields = ["id", "graph", "provider_config"] for field in required_fields: if field not in workflow: if field == "provider_config": errors.append( f"Missing required field: {field}. " f"You must define provider_config with compliance requirements " f"(tos_accepted, audit_enabled, restricted_use_acknowledged) " f"for each provider used in the workflow." ) else: errors.append(f"Missing required field: {field}") # Validate graph structure graph = workflow.get("graph", {}) if "graph" in workflow: if "nodes" not in graph: errors.append("Graph missing 'nodes' field") if "edges" not in graph: errors.append("Graph missing 'edges' field") # Validate edges reference valid nodes if "nodes" in graph and "edges" in graph: node_ids = set(graph["nodes"].keys()) for edge in graph["edges"]: if edge.get("from") not in node_ids: errors.append(f"Edge references missing node: {edge.get('from')}") if edge.get("to") not in node_ids: errors.append(f"Edge references missing node: {edge.get('to')}") # Validate each node has a tool if "nodes" in graph: for node_id, node in graph["nodes"].items(): if "tool" not in node: errors.append(f"Node '{node_id}' missing required 'tool' field") return len(errors) == 0, errors
# ==================== Model Configuration Coherence ====================
[docs] def validate_model_config( self, model_config: dict[str, Any], ) -> Tuple[bool, list[str]]: """ Validate model_config section for coherence. Args: model_config: The model_config dictionary Returns: (valid: bool, errors: List[str]) """ errors = [] if not model_config: return True, [] # Validate mode mode = model_config.get("mode", "explicit") if mode not in ["explicit", "auto", "preset"]: errors.append(f"Invalid mode: {mode}. Must be explicit, auto, or preset") # Validate preset if mode=preset if mode == "preset": preset = model_config.get("preset") if not preset: errors.append("mode=preset requires 'preset' field") else: valid_presets = [ "minimize_cost", "balanced", "maximize_quality", "local_only", ] if preset not in valid_presets: errors.append( f"Unknown preset: {preset}. " f"Valid: {', '.join(valid_presets)}" ) # Validate budget constraints budget = model_config.get("budget", {}) if budget: max_per_task = budget.get("max_per_task_usd") max_workflow = budget.get("max_workflow_usd") if max_per_task and max_workflow: if max_per_task > max_workflow: errors.append( f"max_per_task_usd ({max_per_task}) " f"cannot exceed max_workflow_usd ({max_workflow})" ) # Validate provider preferences provider_prefs = model_config.get("provider_preferences", {}) if provider_prefs: allowlist = set(provider_prefs.get("allowlist", [])) blocklist = set(provider_prefs.get("blocklist", [])) if allowlist and blocklist: overlap = allowlist & blocklist if overlap: errors.append( f"Providers in both allowlist and blocklist: {overlap}" ) # Validate providers against registry if allowlist: available = set(self.registry.list_providers()) invalid = allowlist - available if invalid: errors.append( f"Unknown providers in allowlist: {', '.join(invalid)}. " f"Available: {', '.join(sorted(available))}" ) # Validate sampling params sampling = model_config.get("sampling_params", {}) if sampling: temp = sampling.get("temperature") if temp is not None: if not (0.0 <= temp <= 2.0): errors.append( f"Invalid temperature: {temp}. Must be 0.0 to 2.0" ) max_tokens = sampling.get("max_tokens") if max_tokens is not None and max_tokens < 1: errors.append( f"Invalid max_tokens: {max_tokens}. Must be >= 1" ) return len(errors) == 0, errors
# ==================== Graph Connectivity ====================
[docs] def validate_graph_connectivity( self, workflow: dict[str, Any], ) -> Tuple[bool, list[str]]: """ Validate workflow graph connectivity (no disconnected components). Args: workflow: Workflow dictionary Returns: (valid: bool, errors: List[str]) """ errors = [] graph = workflow.get("graph", {}) nodes = graph.get("nodes", {}) edges = graph.get("edges", []) if not nodes: return True, [] # Build adjacency list adj: dict[str, set[str]] = {node_id: set() for node_id in nodes} for edge in edges: from_node = edge.get("from") to_node = edge.get("to") if from_node in adj and to_node in adj: adj[from_node].add(to_node) adj[to_node].add(from_node) # Undirected for connectivity check # Find connected components using BFS visited = set() for start_node in nodes: if start_node in visited: continue # BFS to find component component = set() queue = [start_node] while queue: current = queue.pop(0) if current in visited: continue visited.add(current) component.add(current) for neighbor in adj[current]: if neighbor not in visited: queue.append(neighbor) # Check if component has edges (connected to others) isolated_nodes = [n for n in component if not adj[n]] if len(component) > 1 and isolated_nodes: # Node with no edges in a multi-node component errors.append( f"Node(s) {isolated_nodes} have no connections in graph" ) # Check for nodes that exist but have no edges at all isolated = [n for n, neighbors in adj.items() if not neighbors] if len(nodes) > 1 and isolated: errors.append( f"Isolated nodes (no edges): {', '.join(isolated)}" ) return len(errors) == 0, errors
# ==================== Provider Registry Integration ====================
[docs] def validate_providers( self, workflow: dict[str, Any], ) -> Tuple[bool, list[str]]: """ Validate all referenced providers and models against registry. Args: workflow: Workflow dictionary Returns: (valid: bool, errors: List[str]) """ errors = [] # Get all available providers from registry available_providers = set(self.registry.list_providers()) # Check model_config for provider preferences model_config = workflow.get("model_config", {}) provider_prefs = model_config.get("provider_preferences", {}) allowlist = set(provider_prefs.get("allowlist", [])) if allowlist: # Check allowlist only contains valid providers invalid = allowlist - available_providers if invalid: errors.append( f"Unknown providers in provider_preferences.allowlist: {invalid}" ) # Validate each node graph = workflow.get("graph", {}) for node_id, node in graph.get("nodes", {}).items(): if node.get("tool") != "model_call": continue provider = node.get("provider") model = node.get("model") if provider: # Check provider exists in registry if provider not in available_providers: errors.append( f"Node '{node_id}': Unknown provider '{provider}'" ) else: # Provider exists - validate model if specified if model: provider_config = self.registry.get_provider(provider) available_models = set( provider_config.get("models", {}).keys() if provider_config else [] ) if model not in available_models: errors.append( f"Node '{node_id}': Unknown model '{model}' " f"for provider '{provider}'. " f"Available: {', '.join(sorted(available_models))}" ) # Check provider/model compatibility if provider and model: provider_config = self.registry.get_provider(provider) if provider_config: models = provider_config.get("models", {}) if models and model not in models: errors.append( f"Node '{node_id}': Model '{model}' not in " f"provider '{provider}' registry" ) return len(errors) == 0, errors
# ==================== Compliance Checks ====================
[docs] def validate_compliance( self, workflow: dict[str, Any], ) -> Tuple[bool, list[str]]: """ Validate compliance requirements for all providers. Args: workflow: Workflow dictionary Returns: (valid: bool, errors: List[str]) """ issues = [] # Get workflow's provider config (where users accept compliance requirements) workflow_provider_config = workflow.get("provider_config", {}) for node_id, node in workflow.get("graph", {}).get("nodes", {}).items(): if node.get("tool") != "model_call": continue provider = node.get("provider") if not provider: continue provider_registry_config = self.registry.get_provider(provider) if not provider_registry_config: continue compliance = provider_registry_config.get("compliance", {}) if not compliance: continue # Get the provider's acceptance config from the workflow provider_acceptance = workflow_provider_config.get(provider, {}) # Check ToS acceptance requires_tos = compliance.get("requires_tos_acceptance", False) if requires_tos and not provider_acceptance.get("tos_accepted", False): issues.append( f"Node '{node_id}': Provider '{provider}' requires " f"ToS acceptance. See: {compliance.get('tos_url', 'N/A')}" ) # Check audit requirement if compliance.get("audit_required", False) and not provider_acceptance.get("audit_enabled", False): issues.append( f"Node '{node_id}': Provider '{provider}' requires audit logging" ) # Restricted use cases are informational, not blocking # (User can acknowledge by setting usage_type in their config) restricted = compliance.get("restricted_use_cases", []) if restricted and not provider_acceptance.get("restricted_use_acknowledged", False): # Only issue warning if not acknowledged issues.append( f"Node '{node_id}': Provider '{provider}' restricted for: " f"{', '.join(restricted)}" ) return len(issues) == 0, issues
# ==================== Complete Validation ====================
[docs] def validate_complete( self, workflow: dict[str, Any], ) -> Tuple[bool, list[str]]: """ Run all validations. Args: workflow: Workflow dictionary Returns: (valid: bool, errors: List[str]) """ all_errors = [] # Schema validation (JSON structure) schema_valid, schema_errors = self.validate_schema(workflow) all_errors.extend(schema_errors) # Model config validation (coherence) model_config = workflow.get("model_config", {}) config_valid, config_errors = self.validate_model_config(model_config) all_errors.extend(config_errors) # Graph connectivity connectivity_valid, conn_errors = self.validate_graph_connectivity(workflow) all_errors.extend(conn_errors) # Provider validation (registry integration) provider_valid, provider_errors = self.validate_providers(workflow) all_errors.extend(provider_errors) # Compliance validation compliance_valid, compliance_errors = self.validate_compliance(workflow) all_errors.extend(compliance_errors) return len(all_errors) == 0, all_errors
[docs] @staticmethod def validate_file(workflow_path: str) -> Tuple[bool, list[str]]: """ Validate a workflow file. Args: workflow_path: Path to workflow.json Returns: (valid: bool, errors: List[str]) """ try: with open(workflow_path) as f: workflow = json.load(f) except json.JSONDecodeError as e: return False, [f"Invalid JSON: {e}"] except IOError as e: return False, [f"Cannot read file: {e}"] return WorkflowValidator.validate_complete_static(workflow)
# ==================== Static Methods for Backward Compatibility ====================
[docs] @staticmethod def validate_schema_static(workflow: dict[str, Any]) -> Tuple[bool, list[str]]: """Static wrapper for validate_schema.""" return WorkflowValidator().validate_schema(workflow)
[docs] @staticmethod def validate_model_config_static(model_config: dict[str, Any]) -> Tuple[bool, list[str]]: """Static wrapper for validate_model_config.""" return WorkflowValidator().validate_model_config(model_config)
[docs] @staticmethod def validate_providers_static(workflow: dict[str, Any]) -> Tuple[bool, list[str]]: """Static wrapper for validate_providers.""" return WorkflowValidator().validate_providers(workflow)
[docs] @staticmethod def validate_complete_static(workflow: dict[str, Any]) -> Tuple[bool, list[str]]: """Static wrapper for validate_complete.""" return WorkflowValidator().validate_complete(workflow)
[docs] @staticmethod def validate_file_static(workflow_path: str) -> Tuple[bool, list[str]]: """Static wrapper for file validation.""" validator = WorkflowValidator() return validator.validate_file(workflow_path)