Workflow Execution¶
execution.runner¶
execution.graph¶
Script¶
graph.py
Path¶
python/hillstar/graph.py
Purpose¶
Graph Execution Engine: DAG-based workflow runner with checkpointing.
Implements topological sort, cycle detection, and state management for directed acyclic graph (DAG) workflows. Supports node execution, checkpoint creation, and full auditability via trace logging.
Inputs¶
workflow_json (dict): Workflow definition with nodes, edges, state, permissions
Outputs¶
Workflow execution state (node_outputs, trace, execution_order)
Assumptions¶
Workflow is a valid DAG (no cycles)
Node inputs can reference previous node outputs via {{ node_id.output }} syntax
Permissions are specified per node (ask, always, never)
Checkpoints created at specified nodes only
Parameters¶
None (class-based)
Failure Modes¶
Cycle detected in graph ValueError
Invalid node reference KeyError
Missing required node ValueError
Author: Julen Gamboa <julen.gamboa.ds@gmail.com>
Created¶
2026-02-07
Last Edited¶
2026-02-08 (error detection in execute_node)
- class execution.graph.WorkflowGraph[source]¶
Bases:
objectDirected Acyclic Graph (DAG) workflow executor.
execution.node_executor¶
execution.trace¶
Script¶
trace.py
Path¶
python/hillstar/trace.py
Purpose¶
Trace Logger: Comprehensive audit trail for workflow execution.
Logs all events (node execution, errors, model calls) to JSONL file for auditability and reproducibility. Timestamps all events automatically.
Inputs¶
output_dir (str): Directory to store trace files
Outputs¶
Trace file (JSONL): One JSON object per line, each representing an event
Assumptions¶
Output directory exists or can be created
Write permissions to output_dir
Parameters¶
None (append-only logging)
Failure Modes¶
No write permissions IOError
Disk full IOError
Author: Julen Gamboa <julen.gamboa.ds@gmail.com>
Created¶
2026-02-07
Last Edited¶
2026-02-08 (enforce traces/ subdirectory)
- class execution.trace.TraceLogger[source]¶
Bases:
objectLog all workflow executions for auditability and reproducibility.
- __init__(output_dir)[source]¶
- Parameters:
output_dir (str) – Directory to store trace files (will use output_dir/traces/)
execution.checkpoint¶
Script¶
checkpoint.py
Path¶
python/hillstar/checkpoint.py
Purpose¶
Checkpoint Manager: Save and restore workflow state for replay and recovery.
Creates JSON checkpoints after specified nodes complete, allowing workflows to be resumed from intermediate states. Supports full state export/import.
Inputs¶
output_dir (str): Directory to store checkpoints node_id (str): Node completing execution state (dict): Workflow state to save
Outputs¶
Checkpoint files (JSON): One checkpoint per node
Assumptions¶
Output directory exists or can be created
Write permissions to output_dir
Parameters¶
None (per-node checkpointing)
Failure Modes¶
No write permissions IOError
Corrupt checkpoint file json.JSONDecodeError
Missing checkpoint FileNotFoundError
Author: Julen Gamboa <julen.gamboa.ds@gmail.com>
Created¶
2026-02-07
Last Edited¶
2026-02-07
execution.observability¶
Script¶
observability.py
Path¶
python/hillstar/execution/observability.py
Purpose¶
Comprehensive observability layer for workflow execution with progress tracking, timestamping, PID logging, hashing, and trace generation.
Inputs¶
workflow_id (str): Current workflow identifier
output_dir (str): Directory for logs and traces
total_nodes (int): Total number of nodes to execute
Outputs¶
Real-time progress output to stdout and log files
Trace file with detailed execution metadata
Assumptions¶
Output directories exist or can be created
Write permissions to output_dir
Parameters¶
verbose: Enable detailed logging
use_tqdm: Use tqdm progress bars (True by default)
Failure Modes¶
No write permissions IOError
Disk full IOError
Author: Julen Gamboa <julen.gamboa.ds@gmail.com>
Created¶
2026-02-08
Last Edited¶
2026-02-17
- class execution.observability.TqdmFileWrapper[source]¶
Bases:
objectWrapper that captures tqdm output to log files while displaying on terminal.
Strips ANSI escape codes before writing to log files for cleaner output, while preserving colored/animated bar on stdout/stderr for real-time viewing.
- class execution.observability.ExecutionObserver[source]¶
Bases:
objectReal-time monitoring and logging of workflow execution.
- __init__(workflow_id, output_dir, total_nodes, use_tqdm=True)[source]¶
Initialize execution observer.
- node_success(node_id, duration_seconds, output_hash=None, output_summary=None)[source]¶
Record node execution success.
execution.config_validator¶
Script¶
config_validator.py
Path¶
execution/config_validator.py
Purpose¶
Config Validator: Validate model configuration, load environment files, and manage API key retrieval.
Extracted from WorkflowRunner to separate configuration concerns from execution logic. Validates coherence of model config, loads .env files, and provides API key management.
Inputs¶
model_config (dict): Model configuration to validate graph (WorkflowGraph): Workflow graph for schema access trace_logger (TraceLogger): Logger for warnings provider (str): Provider name for API key lookup
Outputs¶
validated (bool): True if config passes validation (raises on failure) api_key (str|None): API key from config or environment None (side effects): Logs warnings, loads environment variables
Assumptions¶
Workflow file is valid JSON matching schema
.env file exists or environment is pre-configured
API keys are stored in config file or environment variables
Parameters¶
None (per-workflow via model_config and graph)
Failure Modes¶
Invalid mode/preset combination ConfigurationError
Budget constraints incoherent ConfigurationError
Allowlist/blocklist overlap ConfigurationError
API key not found Return None (model handles error)
.env file missing Silently ignore
Author: Julen Gamboa <julen.gamboa.ds@gmail.com>
Created¶
2026-02-22
Last Edited¶
2026-02-22
- class execution.config_validator.ConfigValidator[source]¶
Bases:
objectValidate model configuration and manage API key retrieval.
- __init__(model_config, graph, trace_logger)[source]¶
- Parameters:
model_config (dict) – Model configuration dict to validate
graph (WorkflowGraph) – WorkflowGraph instance for accessing workflow schema
trace_logger (TraceLogger) – TraceLogger instance for logging warnings
- static load_env_file()[source]¶
Load .env file from repo root to ensure API keys are available.
- Return type:
None
- validate_model_config()[source]¶
Validate model configuration for coherence.
- Raises:
ConfigurationError – If configuration is invalid
- Return type:
None
execution.cost_manager¶
Script¶
cost_manager.py
Path¶
execution/cost_manager.py
Purpose¶
Cost Manager: Handle cost estimation, budget checking, and cost tracking for workflow execution.
Extracted from WorkflowRunner to enable modular unit testing and cost policy changes without affecting node execution or model selection logic.
Pricing fetched from provider_registry.default.json (source of truth) via get_registry().
Inputs¶
model_config (dict): Model configuration with pricing and budget information provider (str): Provider name (anthropic, openai, local, devstral, etc.) model_name (str): Model identifier input_tokens (int): Estimated input tokens for cost calculation output_tokens (int): Estimated output tokens for cost calculation estimated_cost (float): Cost to check against budget limits node_id (str): Node identifier for error reporting cost (float): Actual cost to record
Outputs¶
estimated_cost (float): USD cost estimate for model call None (methods modify internal state): cumulative_cost_usd, node_costs dict
Assumptions¶
Pricing data is fetched from provider_registry.default.json (source of truth)
Budget constraints are coherent (max_per_task <= max_workflow)
Token estimates are reasonable approximations
Parameters¶
None (per-workflow via model_config)
Failure Modes¶
Unknown model Return 0.0 (no pricing available)
Missing budget config No budget enforcement
Registry unavailable Return 0.0 for cost estimation
Author: Julen Gamboa <julen.gamboa.ds@gmail.com>
Created¶
2026-02-22
Last Edited¶
2026-02-24
- class execution.cost_manager.CostManager[source]¶
Bases:
objectManage cost estimation, budget enforcement, and cost tracking for models.
- __init__(model_config)[source]¶
- Parameters:
model_config (dict) – Model configuration dict with budget info
- estimate_cost(provider, model_name, input_tokens, output_tokens)[source]¶
Estimate cost of a model call using provider_registry pricing.
- check_budget(estimated_cost, node_id)[source]¶
Check if cost would exceed budget limits.
- Parameters:
- Raises:
BudgetExceededError – If budget would be exceeded
- Return type:
None