Workflow Execution

execution.runner

execution.graph

Script

graph.py

Path

python/hillstar/graph.py

Purpose

Graph Execution Engine: DAG-based workflow runner with checkpointing.

Implements topological sort, cycle detection, and state management for directed acyclic graph (DAG) workflows. Supports node execution, checkpoint creation, and full auditability via trace logging.

Inputs

workflow_json (dict): Workflow definition with nodes, edges, state, permissions

Outputs

Workflow execution state (node_outputs, trace, execution_order)

Assumptions

  • Workflow is a valid DAG (no cycles)

  • Node inputs can reference previous node outputs via {{ node_id.output }} syntax

  • Permissions are specified per node (ask, always, never)

  • Checkpoints created at specified nodes only

Parameters

None (class-based)

Failure Modes

  • Cycle detected in graph ValueError

  • Invalid node reference KeyError

  • Missing required node ValueError

Author: Julen Gamboa <julen.gamboa.ds@gmail.com>

Created

2026-02-07

Last Edited

2026-02-08 (error detection in execute_node)

class execution.graph.WorkflowGraph[source]

Bases: object

Directed Acyclic Graph (DAG) workflow executor.

__init__(workflow_json)[source]
Parameters:

workflow_json (Dict[str, Any]) – Workflow definition (nodes + edges)

get_node_inputs(node_id)[source]

Resolve node inputs, substituting references to previous outputs.

Parameters:

node_id (str)

Return type:

Any

execute_node(node_id, executor_fn)[source]

Execute a single node.

Parameters:

node_id (str)

Return type:

Any

get_execution_order()[source]

Return the order in which nodes should execute.

Return type:

List[str]

get_checkpoint_nodes()[source]

Return nodes where checkpoints should be created.

Return type:

List[str]

export_state()[source]

Export complete execution state.

Return type:

Dict[str, Any]

import_state(state)[source]

Import execution state from checkpoint for resumption.

Parameters:

state (Dict[str, Any]) – State dictionary from checkpoint

Return type:

None

execution.node_executor

execution.trace

Script

trace.py

Path

python/hillstar/trace.py

Purpose

Trace Logger: Comprehensive audit trail for workflow execution.

Logs all events (node execution, errors, model calls) to JSONL file for auditability and reproducibility. Timestamps all events automatically.

Inputs

output_dir (str): Directory to store trace files

Outputs

Trace file (JSONL): One JSON object per line, each representing an event

Assumptions

  • Output directory exists or can be created

  • Write permissions to output_dir

Parameters

None (append-only logging)

Failure Modes

  • No write permissions IOError

  • Disk full IOError

Author: Julen Gamboa <julen.gamboa.ds@gmail.com>

Created

2026-02-07

Last Edited

2026-02-08 (enforce traces/ subdirectory)

class execution.trace.TraceLogger[source]

Bases: object

Log all workflow executions for auditability and reproducibility.

__init__(output_dir)[source]
Parameters:

output_dir (str) – Directory to store trace files (will use output_dir/traces/)

log(event)[source]

Log a single event.

Parameters:

event (Dict[str, Any]) – Event dictionary (will be timestamped if not present)

Return type:

None

finalize()[source]

Finalize trace and return file path.

Returns:

Path to trace file

Return type:

str

get_events()[source]

Get all logged events.

Return type:

List[Dict]

get_cost_summary()[source]

Extract cost summary from logged events.

Return type:

Dict[str, Any]

execution.checkpoint

Script

checkpoint.py

Path

python/hillstar/checkpoint.py

Purpose

Checkpoint Manager: Save and restore workflow state for replay and recovery.

Creates JSON checkpoints after specified nodes complete, allowing workflows to be resumed from intermediate states. Supports full state export/import.

Inputs

output_dir (str): Directory to store checkpoints node_id (str): Node completing execution state (dict): Workflow state to save

Outputs

Checkpoint files (JSON): One checkpoint per node

Assumptions

  • Output directory exists or can be created

  • Write permissions to output_dir

Parameters

None (per-node checkpointing)

Failure Modes

  • No write permissions IOError

  • Corrupt checkpoint file json.JSONDecodeError

  • Missing checkpoint FileNotFoundError

Author: Julen Gamboa <julen.gamboa.ds@gmail.com>

Created

2026-02-07

Last Edited

2026-02-07

class execution.checkpoint.CheckpointManager[source]

Bases: object

Manage workflow checkpoints for replay and recovery.

__init__(output_dir)[source]
Parameters:

output_dir (str) – Directory to store checkpoints

create(node_id, state)[source]

Create a checkpoint after node execution.

Parameters:
  • node_id (str) – Node that just completed

  • state (Dict[str, Any]) – Workflow state to save

Returns:

Path to checkpoint file

Return type:

str

list_checkpoints()[source]

List all available checkpoints.

Return type:

Dict[str, str]

load(checkpoint_file)[source]

Load a checkpoint.

Parameters:

checkpoint_file (str) – Path to checkpoint file

Returns:

Checkpoint data

Return type:

Dict[str, Any]

get_latest_checkpoint(node_id=None)[source]

Get most recent checkpoint.

Parameters:

node_id (str | None) – Optionally filter by node (get all if None)

Returns:

Path to latest checkpoint or None

Return type:

str | None

execution.observability

Script

observability.py

Path

python/hillstar/execution/observability.py

Purpose

Comprehensive observability layer for workflow execution with progress tracking, timestamping, PID logging, hashing, and trace generation.

Inputs

  • workflow_id (str): Current workflow identifier

  • output_dir (str): Directory for logs and traces

  • total_nodes (int): Total number of nodes to execute

Outputs

  • Real-time progress output to stdout and log files

  • Trace file with detailed execution metadata

Assumptions

  • Output directories exist or can be created

  • Write permissions to output_dir

Parameters

  • verbose: Enable detailed logging

  • use_tqdm: Use tqdm progress bars (True by default)

Failure Modes

  • No write permissions IOError

  • Disk full IOError

Author: Julen Gamboa <julen.gamboa.ds@gmail.com>

Created

2026-02-08

Last Edited

2026-02-17

class execution.observability.TqdmFileWrapper[source]

Bases: object

Wrapper that captures tqdm output to log files while displaying on terminal.

Strips ANSI escape codes before writing to log files for cleaner output, while preserving colored/animated bar on stdout/stderr for real-time viewing.

__init__(log_file_path, audit_log_file_path)[source]

Initialize wrapper with log file paths.

Parameters:
  • log_file_path (Path) – Backwards-compat log file location

  • audit_log_file_path (Path) – Audit directory log file location

write(text)[source]

Write text to log files (with ANSI stripped) and original stderr.

Parameters:

text (str) – Raw text from tqdm (may contain ANSI escape codes)

flush()[source]

Flush the original stderr.

class execution.observability.ExecutionObserver[source]

Bases: object

Real-time monitoring and logging of workflow execution.

__init__(workflow_id, output_dir, total_nodes, use_tqdm=True)[source]

Initialize execution observer.

Parameters:
  • workflow_id (str) – Workflow identifier

  • output_dir (str) – Base output directory

  • total_nodes (int) – Total nodes in workflow

  • use_tqdm (bool) – Use tqdm progress bars

node_start(node_id, node_index)[source]

Record node execution start.

Parameters:
  • node_id (str)

  • node_index (int)

node_success(node_id, duration_seconds, output_hash=None, output_summary=None)[source]

Record node execution success.

Parameters:
  • node_id (str)

  • duration_seconds (float)

  • output_hash (str | None)

  • output_summary (Dict | None)

node_failure(node_id, error_msg, duration_seconds)[source]

Record node execution failure.

Parameters:
  • node_id (str)

  • error_msg (str)

  • duration_seconds (float)

workflow_complete(cumulative_cost_usd=0.0)[source]

Record workflow completion.

Parameters:

cumulative_cost_usd (float)

workflow_error(error_msg)[source]

Record workflow error.

Parameters:

error_msg (str)

static hash_output(data)[source]

Generate SHA256 hash of output for auditability.

Parameters:

data (Any)

Return type:

str

execution.config_validator

Script

config_validator.py

Path

execution/config_validator.py

Purpose

Config Validator: Validate model configuration, load environment files, and manage API key retrieval.

Extracted from WorkflowRunner to separate configuration concerns from execution logic. Validates coherence of model config, loads .env files, and provides API key management.

Inputs

model_config (dict): Model configuration to validate graph (WorkflowGraph): Workflow graph for schema access trace_logger (TraceLogger): Logger for warnings provider (str): Provider name for API key lookup

Outputs

validated (bool): True if config passes validation (raises on failure) api_key (str|None): API key from config or environment None (side effects): Logs warnings, loads environment variables

Assumptions

  • Workflow file is valid JSON matching schema

  • .env file exists or environment is pre-configured

  • API keys are stored in config file or environment variables

Parameters

None (per-workflow via model_config and graph)

Failure Modes

  • Invalid mode/preset combination ConfigurationError

  • Budget constraints incoherent ConfigurationError

  • Allowlist/blocklist overlap ConfigurationError

  • API key not found Return None (model handles error)

  • .env file missing Silently ignore

Author: Julen Gamboa <julen.gamboa.ds@gmail.com>

Created

2026-02-22

Last Edited

2026-02-22

class execution.config_validator.ConfigValidator[source]

Bases: object

Validate model configuration and manage API key retrieval.

__init__(model_config, graph, trace_logger)[source]
Parameters:
  • model_config (dict) – Model configuration dict to validate

  • graph (WorkflowGraph) – WorkflowGraph instance for accessing workflow schema

  • trace_logger (TraceLogger) – TraceLogger instance for logging warnings

static load_env_file()[source]

Load .env file from repo root to ensure API keys are available.

Return type:

None

validate_model_config()[source]

Validate model configuration for coherence.

Raises:

ConfigurationError – If configuration is invalid

Return type:

None

get_api_key_for_provider(provider)[source]

Get API key for provider from config file or environment.

Priority: 1. model_config parameter (for workflow-specific keys) 2. ~/.hillstar/provider_registry.json (user config) 3. Environment variable 4. Return None (let model handle error)

Parameters:

provider (str) – Provider name (e.g., “anthropic”)

Returns:

API key string or None if not found

Return type:

str | None

execution.cost_manager

Script

cost_manager.py

Path

execution/cost_manager.py

Purpose

Cost Manager: Handle cost estimation, budget checking, and cost tracking for workflow execution.

Extracted from WorkflowRunner to enable modular unit testing and cost policy changes without affecting node execution or model selection logic.

Pricing fetched from provider_registry.default.json (source of truth) via get_registry().

Inputs

model_config (dict): Model configuration with pricing and budget information provider (str): Provider name (anthropic, openai, local, devstral, etc.) model_name (str): Model identifier input_tokens (int): Estimated input tokens for cost calculation output_tokens (int): Estimated output tokens for cost calculation estimated_cost (float): Cost to check against budget limits node_id (str): Node identifier for error reporting cost (float): Actual cost to record

Outputs

estimated_cost (float): USD cost estimate for model call None (methods modify internal state): cumulative_cost_usd, node_costs dict

Assumptions

  • Pricing data is fetched from provider_registry.default.json (source of truth)

  • Budget constraints are coherent (max_per_task <= max_workflow)

  • Token estimates are reasonable approximations

Parameters

None (per-workflow via model_config)

Failure Modes

  • Unknown model Return 0.0 (no pricing available)

  • Missing budget config No budget enforcement

  • Registry unavailable Return 0.0 for cost estimation

Author: Julen Gamboa <julen.gamboa.ds@gmail.com>

Created

2026-02-22

Last Edited

2026-02-24

class execution.cost_manager.CostManager[source]

Bases: object

Manage cost estimation, budget enforcement, and cost tracking for models.

__init__(model_config)[source]
Parameters:

model_config (dict) – Model configuration dict with budget info

estimate_cost(provider, model_name, input_tokens, output_tokens)[source]

Estimate cost of a model call using provider_registry pricing.

Parameters:
  • provider (str) – Provider name (anthropic, openai, local, devstral_local, etc.)

  • model_name (str) – Model name/API ID

  • input_tokens (int) – Estimated input tokens

  • output_tokens (int) – Estimated output tokens

Returns:

Estimated cost in USD (0.0 if pricing not available)

Return type:

float

check_budget(estimated_cost, node_id)[source]

Check if cost would exceed budget limits.

Parameters:
  • estimated_cost (float) – Estimated cost of this call in USD

  • node_id (str) – Node ID for logging

Raises:

BudgetExceededError – If budget would be exceeded

Return type:

None

record_cost(node_id, cost)[source]

Record actual cost for a node.

Parameters:
Return type:

None

execution.model_selector