Source code for execution.checkpoint

"""
Script
------
checkpoint.py

Path
----
python/hillstar/checkpoint.py

Purpose
-------
Checkpoint Manager: Save and restore workflow state for replay and recovery.

Creates JSON checkpoints after specified nodes complete, allowing workflows
to be resumed from intermediate states. Supports full state export/import.

Inputs
------
output_dir (str): Directory to store checkpoints
node_id (str): Node completing execution
state (dict): Workflow state to save

Outputs
-------
Checkpoint files (JSON): One checkpoint per node

Assumptions
-----------
- Output directory exists or can be created
- Write permissions to output_dir

Parameters
----------
None (per-node checkpointing)

Failure Modes
-------------
- No write permissions IOError
- Corrupt checkpoint file json.JSONDecodeError
- Missing checkpoint FileNotFoundError

Author: Julen Gamboa <julen.gamboa.ds@gmail.com>

Created
-------
2026-02-07

Last Edited
-----------
2026-02-07
"""

import json
import os
from datetime import datetime
from typing import Dict, Any, Optional


[docs] class CheckpointManager: """Manage workflow checkpoints for replay and recovery."""
[docs] def __init__(self, output_dir: str): """ Args: output_dir: Directory to store checkpoints """ self.output_dir = os.path.join(output_dir, "checkpoints") os.makedirs(self.output_dir, exist_ok=True)
[docs] def create(self, node_id: str, state: Dict[str, Any]) -> str: """ Create a checkpoint after node execution. Args: node_id: Node that just completed state: Workflow state to save Returns: Path to checkpoint file """ checkpoint = { "timestamp": datetime.now().isoformat(), "node_id": node_id, "state": state, } checkpoint_file = os.path.join( self.output_dir, f"checkpoint_{node_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" ) with open(checkpoint_file, "w") as f: json.dump(checkpoint, f, indent=2) return checkpoint_file
[docs] def list_checkpoints(self) -> Dict[str, str]: """List all available checkpoints.""" checkpoints = {} for filename in os.listdir(self.output_dir): if filename.startswith("checkpoint_") and filename.endswith(".json"): filepath = os.path.join(self.output_dir, filename) try: with open(filepath) as f: data = json.load(f) node_id = data.get("node_id") if node_id: checkpoints[node_id] = filepath except json.JSONDecodeError: pass return checkpoints
[docs] def load(self, checkpoint_file: str) -> Dict[str, Any]: """ Load a checkpoint. Args: checkpoint_file: Path to checkpoint file Returns: Checkpoint data """ with open(checkpoint_file) as f: return json.load(f)
[docs] def get_latest_checkpoint(self, node_id: Optional[str] = None) -> Optional[str]: """ Get most recent checkpoint. Args: node_id: Optionally filter by node (get all if None) Returns: Path to latest checkpoint or None """ checkpoints = self.list_checkpoints() if node_id and node_id in checkpoints: return checkpoints[node_id] if checkpoints: latest = max( checkpoints.items(), key=lambda x: os.path.getmtime(x[1]) ) return latest[1] return None