Source code for execution.cost_manager
"""
Script
------
cost_manager.py
Path
----
execution/cost_manager.py
Purpose
-------
Cost Manager: Handle cost estimation, budget checking, and cost tracking for workflow execution.
Extracted from WorkflowRunner to enable modular unit testing and cost policy changes
without affecting node execution or model selection logic.
Pricing fetched from provider_registry.default.json (source of truth) via get_registry().
Inputs
------
model_config (dict): Model configuration with pricing and budget information
provider (str): Provider name (anthropic, openai, local, devstral, etc.)
model_name (str): Model identifier
input_tokens (int): Estimated input tokens for cost calculation
output_tokens (int): Estimated output tokens for cost calculation
estimated_cost (float): Cost to check against budget limits
node_id (str): Node identifier for error reporting
cost (float): Actual cost to record
Outputs
-------
estimated_cost (float): USD cost estimate for model call
None (methods modify internal state): cumulative_cost_usd, node_costs dict
Assumptions
-----------
- Pricing data is fetched from provider_registry.default.json (source of truth)
- Budget constraints are coherent (max_per_task <= max_workflow)
- Token estimates are reasonable approximations
Parameters
----------
None (per-workflow via model_config)
Failure Modes
-------------
- Unknown model Return 0.0 (no pricing available)
- Missing budget config No budget enforcement
- Registry unavailable Return 0.0 for cost estimation
Author: Julen Gamboa <julen.gamboa.ds@gmail.com>
Created
-------
2026-02-22
Last Edited
-----------
2026-02-24
"""
from utils import BudgetExceededError
from config.provider_registry import get_registry
[docs]
class CostManager:
"""Manage cost estimation, budget enforcement, and cost tracking for models."""
[docs]
def __init__(self, model_config: dict):
"""
Args:
model_config: Model configuration dict with budget info
"""
self.model_config = model_config
self.cumulative_cost_usd = 0.0
self.node_costs: dict = {} # node_id -> cost_usd
[docs]
def estimate_cost(
self,
provider: str,
model_name: str,
input_tokens: int,
output_tokens: int,
) -> float:
"""
Estimate cost of a model call using provider_registry pricing.
Args:
provider: Provider name (anthropic, openai, local, devstral_local, etc.)
model_name: Model name/API ID
input_tokens: Estimated input tokens
output_tokens: Estimated output tokens
Returns:
Estimated cost in USD (0.0 if pricing not available)
"""
# Local/free models have no charge
if provider in ["devstral_local", "local", "ollama"]:
return 0.0
try:
# Get pricing from registry (source of truth)
registry = get_registry()
model_config = registry.get_model(provider, model_name)
# If model not found in registry, return 0.0
if not model_config:
return 0.0
# Extract pricing from model config
pricing = model_config.get("pricing", {})
input_cost_per_1m = pricing.get("input_per_1m_usd")
output_cost_per_1m = pricing.get("output_per_1m_usd")
# If pricing not available, return 0.0
if input_cost_per_1m is None or output_cost_per_1m is None:
return 0.0
# Calculate cost based on tokens (pricing is per 1M tokens)
input_cost = (input_tokens / 1_000_000) * input_cost_per_1m
output_cost = (output_tokens / 1_000_000) * output_cost_per_1m
return input_cost + output_cost
except Exception:
# If registry is unavailable, return 0.0
# This allows execution to continue without cost tracking
return 0.0
[docs]
def check_budget(
self,
estimated_cost: float,
node_id: str,
) -> None:
"""
Check if cost would exceed budget limits.
Args:
estimated_cost: Estimated cost of this call in USD
node_id: Node ID for logging
Raises:
BudgetExceededError: If budget would be exceeded
"""
budget = self.model_config.get("budget", {})
max_per_task = budget.get("max_per_task_usd")
max_workflow = budget.get("max_workflow_usd")
if max_per_task and estimated_cost > max_per_task:
raise BudgetExceededError(
f"Node {node_id}: estimated cost ${estimated_cost:.4f} "
f"exceeds per-task limit ${max_per_task}"
)
if max_workflow and (self.cumulative_cost_usd + estimated_cost) > max_workflow:
remaining = max_workflow - self.cumulative_cost_usd
raise BudgetExceededError(
f"Node {node_id}: estimated cost ${estimated_cost:.4f} "
f"would exceed workflow limit. Remaining: ${remaining:.4f}"
)
[docs]
def record_cost(self, node_id: str, cost: float) -> None:
"""Record actual cost for a node."""
self.node_costs[node_id] = cost
self.cumulative_cost_usd += cost