Source code for execution.observability

"""
Script
------
observability.py

Path
----
python/hillstar/execution/observability.py

Purpose
-------
Comprehensive observability layer for workflow execution with progress tracking,
timestamping, PID logging, hashing, and trace generation.

Inputs
------
- workflow_id (str): Current workflow identifier
- output_dir (str): Directory for logs and traces
- total_nodes (int): Total number of nodes to execute

Outputs
-------
- Real-time progress output to stdout and log files
- Trace file with detailed execution metadata

Assumptions
-----------
- Output directories exist or can be created
- Write permissions to output_dir

Parameters
----------
- verbose: Enable detailed logging
- use_tqdm: Use tqdm progress bars (True by default)

Failure Modes
-------------
- No write permissions IOError
- Disk full IOError

Author: Julen Gamboa <julen.gamboa.ds@gmail.com>

Created
-------
2026-02-08

Last Edited
-----------
2026-02-17
"""

import hashlib
import json
import os
import re
import sys
import time
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, Optional

try:
	from tqdm import tqdm
	HAS_TQDM = True
except ImportError:
	HAS_TQDM = False


[docs] class TqdmFileWrapper: """Wrapper that captures tqdm output to log files while displaying on terminal. Strips ANSI escape codes before writing to log files for cleaner output, while preserving colored/animated bar on stdout/stderr for real-time viewing. """
[docs] def __init__(self, log_file_path: Path, audit_log_file_path: Path): """Initialize wrapper with log file paths. Args: log_file_path: Backwards-compat log file location audit_log_file_path: Audit directory log file location """ self.log_file = log_file_path self.audit_log_file = audit_log_file_path # Pattern to match ANSI escape sequences (colors, formatting, cursor control) self.ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])') self.original_stderr = sys.stderr
[docs] def write(self, text: str): """Write text to log files (with ANSI stripped) and original stderr. Args: text: Raw text from tqdm (may contain ANSI escape codes) """ # Write colored version to stderr for terminal display self.original_stderr.write(text) # Strip ANSI codes and write clean version to log files clean_text = self.ansi_escape.sub('', text) if clean_text.strip(): # Only write non-empty lines with open(self.log_file, 'a') as f: f.write(clean_text) with open(self.audit_log_file, 'a') as f: f.write(clean_text)
[docs] def flush(self): """Flush the original stderr.""" self.original_stderr.flush()
[docs] class ExecutionObserver: """Real-time monitoring and logging of workflow execution."""
[docs] def __init__( self, workflow_id: str, output_dir: str, total_nodes: int, use_tqdm: bool = True, ): """ Initialize execution observer. Args: workflow_id: Workflow identifier output_dir: Base output directory total_nodes: Total nodes in workflow use_tqdm: Use tqdm progress bars """ self.workflow_id = workflow_id self.output_dir = Path(output_dir) self.total_nodes = total_nodes self.use_tqdm = use_tqdm and HAS_TQDM self.pid = os.getpid() self.start_time = datetime.now() self.workflow_start_epoch = time.time() # Extract step number from workflow_id (e.g., "step_04_populate_mpd" "04") self.step_num = self._extract_step_number(workflow_id) # Create directories self.logs_dir = self.output_dir / "logs" self.traces_dir = self.output_dir / "traces" self.logs_dir.mkdir(parents=True, exist_ok=True) self.traces_dir.mkdir(parents=True, exist_ok=True) # Create audit/step_XX/ directory for structured audit trail self.audit_dir = self.output_dir / "audit" / f"step_{self.step_num}" self.audit_dir.mkdir(parents=True, exist_ok=True) # Log file (both in logs/ for backwards compat and audit/) timestamp = self.start_time.strftime("%Y%m%d_%H%M%S") self.log_file = self.logs_dir / f"execution_{self.workflow_id}_{timestamp}.log" self.audit_log_file = self.audit_dir / f"log_{self.step_num}.log" # Trace file (detailed execution audit - both locations) # Backwards-compat includes step for better discoverability, sorted by timestamp self.trace_file = self.traces_dir / f"trace_step_{self.step_num}_{timestamp}.jsonl" self.audit_trace_file = self.audit_dir / f"trace_{self.step_num}.jsonl" # Initialize progress bar with file wrapper for logging self.progress_bar = None self.tqdm_wrapper = None if self.use_tqdm: # Create wrapper that captures tqdm output to log files self.tqdm_wrapper = TqdmFileWrapper(self.log_file, self.audit_log_file) self.progress_bar = tqdm( total=total_nodes, desc=f"Executing {workflow_id}", unit="node", ncols=100, bar_format="{desc} |{bar}| [{n_fmt}/{total_fmt}] {postfix}", file=self.tqdm_wrapper, # Direct output to wrapper for logging ) # Node tracking self.nodes_completed = 0 self.nodes_failed = 0 self.node_times: Dict[str, float] = {} self.node_outputs: Dict[str, Dict[str, Any]] = {} # Log initialization self._log_entry( event_type="workflow_start", message=f"Workflow execution started: {workflow_id}", details={ "workflow_id": workflow_id, "pid": self.pid, "timestamp": self.start_time.isoformat(), "total_nodes": total_nodes, }, ) # Print header self._print_header()
@staticmethod def _extract_step_number(workflow_id: str) -> str: """ Extract step number from workflow_id. Examples: "step_04_populate_mpd" "04" "step_05_publication_extraction" "05" "unknown_workflow" "00" """ match = re.search(r"step_(\d+)", workflow_id) return match.group(1) if match else "00" def _print_header(self): """Print execution header to stdout and log file.""" separator = "=" * 80 header_lines = [ "", separator, f"[RUN] Executing: {self.workflow_id}", f"[DIR] Output: {self.output_dir.absolute()}", f" PID: {self.pid}", f" Started: {self.start_time.strftime('%Y-%m-%d %H:%M:%S')}", separator, "", ] # Print to stdout for line in header_lines: print(line) # Also write to log file (human-readable header) with open(self.log_file, "a") as f: for line in header_lines: f.write(line + "\n")
[docs] def node_start(self, node_id: str, node_index: int): """Record node execution start.""" self.node_start_time = time.time() self.current_node_id = node_id progress_msg = f"[{node_index}/{self.total_nodes}] {node_id}" if self.progress_bar: self.progress_bar.set_postfix_str(progress_msg) else: print(f" {progress_msg}...", end=" ", flush=True) self._log_entry( event_type="node_start", node_id=node_id, node_index=node_index, details={"epoch_seconds": time.time()}, )
[docs] def node_success( self, node_id: str, duration_seconds: float, output_hash: Optional[str] = None, output_summary: Optional[Dict] = None, ): """Record node execution success.""" self.nodes_completed += 1 self.node_times[node_id] = duration_seconds if output_summary: self.node_outputs[node_id] = output_summary if self.progress_bar: self.progress_bar.update(1) else: print(f" ({duration_seconds:.2f}s)") self._log_entry( event_type="node_success", node_id=node_id, details={ "duration_seconds": duration_seconds, "output_hash": output_hash, "epoch_seconds": time.time(), "output_summary": output_summary, }, )
[docs] def node_failure( self, node_id: str, error_msg: str, duration_seconds: float, ): """Record node execution failure.""" self.nodes_failed += 1 self.node_times[node_id] = duration_seconds if self.progress_bar: self.progress_bar.set_postfix_str(f" {node_id}") else: print(f" Error: {error_msg} ({duration_seconds:.2f}s)") self._log_entry( event_type="node_failure", node_id=node_id, details={ "duration_seconds": duration_seconds, "error": error_msg, "epoch_seconds": time.time(), }, )
[docs] def workflow_complete(self, cumulative_cost_usd: float = 0.0): """Record workflow completion.""" elapsed = time.time() - self.workflow_start_epoch if self.progress_bar: self.progress_bar.close() # Build summary output (both stdout and log file) separator = "=" * 80 summary_lines = [ "", separator, f" Workflow completed: {self.workflow_id}", f" Nodes executed: {self.nodes_completed}/{self.total_nodes}", ] if self.nodes_failed > 0: summary_lines.append(f" Nodes failed: {self.nodes_failed}") summary_lines.extend([ f" Total time: {elapsed:.2f}s ({elapsed / 60:.2f}m)", ]) if cumulative_cost_usd > 0: summary_lines.append(f" Cost: ${cumulative_cost_usd:.4f}") trace_path = self.trace_file.relative_to(self.output_dir.parent) log_path = self.log_file.relative_to(self.output_dir.parent) summary_lines.extend([ f" Trace file: {trace_path}", f" Log file: {log_path}", separator, "", ]) # Print to stdout for line in summary_lines: print(line) # Also write to log file (human-readable summary) with open(self.log_file, "a") as f: for line in summary_lines: f.write(line + "\n") self._log_entry( event_type="workflow_complete", details={ "workflow_id": self.workflow_id, "nodes_completed": self.nodes_completed, "nodes_failed": self.nodes_failed, "total_time_seconds": elapsed, "cumulative_cost_usd": cumulative_cost_usd, "trace_file": str(self.trace_file), "log_file": str(self.log_file), "node_times": self.node_times, "epoch_seconds": time.time(), }, ) # Write metadata.json to audit directory metadata = { "workflow_id": self.workflow_id, "step_number": self.step_num, "start_time": self.start_time.isoformat(), "end_time": datetime.now().isoformat(), "pid": self.pid, "total_nodes": self.total_nodes, "nodes_completed": self.nodes_completed, "nodes_failed": self.nodes_failed, "total_time_seconds": elapsed, "cumulative_cost_usd": cumulative_cost_usd, "node_times": self.node_times, "log_file": str(self.audit_log_file), "trace_file": str(self.audit_trace_file), } metadata_file = self.audit_dir / f"metadata_{self.step_num}.json" with open(metadata_file, "w") as f: json.dump(metadata, f, indent=2)
[docs] def workflow_error(self, error_msg: str): """Record workflow error.""" elapsed = time.time() - self.workflow_start_epoch if self.progress_bar: self.progress_bar.close() # Build error summary output (both stdout and log file) separator = "=" * 80 error_lines = [ "", separator, f" Workflow failed: {self.workflow_id}", f" Error: {error_msg}", f" Time: {elapsed:.2f}s", f" Trace: {self.trace_file.relative_to(self.output_dir.parent)}", separator, "", ] # Print to stdout for line in error_lines: print(line) # Also write to log file (human-readable error summary) with open(self.log_file, "a") as f: for line in error_lines: f.write(line + "\n") self._log_entry( event_type="workflow_error", details={ "workflow_id": self.workflow_id, "error": error_msg, "total_time_seconds": elapsed, "nodes_completed": self.nodes_completed, "nodes_failed": self.nodes_failed, "epoch_seconds": time.time(), }, ) # Write error metadata.json to audit directory metadata = { "workflow_id": self.workflow_id, "step_number": self.step_num, "start_time": self.start_time.isoformat(), "end_time": datetime.now().isoformat(), "pid": self.pid, "total_nodes": self.total_nodes, "nodes_completed": self.nodes_completed, "nodes_failed": self.nodes_failed, "total_time_seconds": elapsed, "error": error_msg, "node_times": self.node_times, "log_file": str(self.audit_log_file), "trace_file": str(self.audit_trace_file), } metadata_file = self.audit_dir / f"metadata_{self.step_num}.json" with open(metadata_file, "w") as f: json.dump(metadata, f, indent=2)
def _log_entry( self, event_type: str, message: Optional[str] = None, node_id: Optional[str] = None, node_index: Optional[int] = None, details: Optional[Dict[str, Any]] = None, ): """Write event to trace file and log file.""" timestamp = datetime.now().isoformat() epoch = time.time() # Build log entry entry = { "timestamp": timestamp, "epoch_seconds": epoch, "event_type": event_type, "pid": self.pid, "workflow_id": self.workflow_id, } if node_id: entry["node_id"] = node_id if node_index: entry["node_index"] = node_index if message: entry["message"] = message if details: entry.update(details) # Write to trace file (JSONL format) - both backwards-compat and audit locations with open(self.trace_file, "a") as f: f.write(json.dumps(entry) + "\n") with open(self.audit_trace_file, "a") as f: f.write(json.dumps(entry) + "\n") # Also write to log file (human-readable format) - both locations log_line = f"[{timestamp}] {event_type.upper()}" if node_id: log_line += f" {node_id}" if message: log_line += f": {message}" if details: log_line += f" | {json.dumps(details)}" with open(self.log_file, "a") as f: f.write(log_line + "\n") with open(self.audit_log_file, "a") as f: f.write(log_line + "\n")
[docs] @staticmethod def hash_output(data: Any) -> str: """Generate SHA256 hash of output for auditability.""" if isinstance(data, dict): data_str = json.dumps(data, sort_keys=True) else: data_str = str(data) return hashlib.sha256(data_str.encode()).hexdigest()[:16]