Source code for workflows.discovery

"""
Script
------
discovery.py

Path
----
python/hillstar/discovery.py

Purpose
-------
Workflow discovery: Find and analyze workflow.json files in project directory.

Scans directory tree for workflow.json files and extracts metadata.
Used by MCP server to discover available workflows.

Inputs
------
start_path (str): Directory to search from (default: current directory)

Outputs
-------
List[str]: Absolute paths to workflow.json files
Dict: Workflow metadata (id, description, nodes, edges)

Assumptions
-----------
- workflow.json files are valid JSON
- Valid according to workflow-schema.json

Parameters
----------
None (per-workflow)

Failure Modes
-------------
- Invalid JSON ValueError
- Missing required fields KeyError
- Unreadable files IOError

Author: Julen Gamboa <julen.gamboa.ds@gmail.com>

Created
-------
2026-02-07

Last Edited
-----------
2026-02-07
"""

import json
import os
from typing import Any, Dict, List


[docs] class WorkflowDiscovery: """Find and analyze Hillstar workflows in a directory tree."""
[docs] @staticmethod def find_workflows( start_path: str = ".", max_depth: int = 5, ) -> List[str]: """ Find all workflow.json files in directory tree. Args: start_path: Directory to search from max_depth: Maximum directory depth to search Returns: List of absolute paths to workflow.json files """ workflows = [] start_path = os.path.abspath(start_path) for root, dirs, files in os.walk(start_path): # Check depth depth = root[len(start_path):].count(os.sep) if depth > max_depth: dirs[:] = [] # Don't recurse further continue # Skip hidden directories dirs[:] = [d for d in dirs if not d.startswith('.')] # Look for workflow.json, step_*.json, phase_*.json, pre_phase_*.json candidates = [] if 'workflow.json' in files: candidates.append('workflow.json') for f in files: if f.endswith('.json') and any( f.startswith(pfx) for pfx in ('step_', 'phase_', 'pre_phase_') ): candidates.append(f) for candidate in candidates: workflow_path = os.path.join(root, candidate) try: if WorkflowDiscovery._is_valid_workflow(workflow_path): workflows.append(workflow_path) except Exception: pass return sorted(workflows)
[docs] @staticmethod def get_workflow_info(workflow_path: str) -> Dict[str, Any]: """ Extract metadata from a workflow file. Args: workflow_path: Absolute path to workflow.json Returns: Dictionary with workflow metadata Raises: ValueError: If workflow is invalid IOError: If file cannot be read """ if not os.path.exists(workflow_path): raise IOError(f"Workflow file not found: {workflow_path}") try: with open(workflow_path) as f: workflow = json.load(f) except json.JSONDecodeError as e: raise ValueError(f"Invalid JSON in {workflow_path}: {e}") # Extract metadata return { "path": os.path.abspath(workflow_path), "filename": os.path.basename(workflow_path), "directory": os.path.dirname(workflow_path), "id": workflow.get("id", "unknown"), "version": workflow.get("version", "1.0"), "description": workflow.get("description", ""), "node_count": len(workflow.get("graph", {}).get("nodes", {})), "edge_count": len(workflow.get("graph", {}).get("edges", [])), "uses_custom_provider": bool( workflow.get("model_config", {}).get("custom_providers") ), "preset": workflow.get("model_config", {}).get("preset"), "mode": workflow.get("model_config", {}).get("mode", "explicit"), "has_budget": bool( workflow.get("model_config", {}).get("budget") ), "checkpoints": len(workflow.get("state", {}).get("checkpoints", [])), }
[docs] @staticmethod def get_all_workflow_info( start_path: str = ".", max_depth: int = 5, ) -> List[Dict[str, Any]]: """ Find all workflows and return their metadata. Args: start_path: Directory to search from max_depth: Maximum directory depth Returns: List of workflow metadata dictionaries """ workflows = WorkflowDiscovery.find_workflows(start_path, max_depth) info_list = [] for workflow_path in workflows: try: info = WorkflowDiscovery.get_workflow_info(workflow_path) info_list.append(info) except Exception: # Skip workflows with errors continue return info_list
@staticmethod def _is_valid_workflow(workflow_path: str) -> bool: """Check if file looks like a valid workflow.""" try: with open(workflow_path) as f: workflow = json.load(f) # Minimal validation: has id and graph return "id" in workflow and "graph" in workflow except Exception: return False
[docs] @staticmethod def find_in_current_project() -> List[Dict[str, Any]]: """Find all workflows in current project (with .hillstar/ or spec/ indicators).""" # Look for indicators of Hillstar project current_dir = os.getcwd() # Check if we're in a Hillstar project has_hillstar_indicators = ( os.path.exists(os.path.join(current_dir, "python/hillstar/schemas/workflow-schema.json")) or os.path.exists(os.path.join(current_dir, ".hillstar")) or os.path.exists(os.path.join(current_dir, "workflow.json")) ) if has_hillstar_indicators: return WorkflowDiscovery.get_all_workflow_info(current_dir, max_depth=3) else: # Still look, but don't assume we're in a Hillstar project return WorkflowDiscovery.get_all_workflow_info(current_dir, max_depth=2)