"""
Script
------
config.py
Purpose
-------
Unified configuration management for Hillstar.
Handles:
- Loading default registry from provider_registry.default.json
- Merging user overrides from user config
- Validating provider configurations against registry schema
- Compliance checks for provider configurations
- Managing user-level API keys and settings
Author: Julen Gamboa <julen.gamboa.ds@gmail.com>
Created
-------
2026-02-07
Last Edited
-----------
2026-02-22
"""
import json
import os
from pathlib import Path
from typing import Any, Optional
from .provider_registry import ProviderRegistry
[docs]
class HillstarConfig:
"""
Unified configuration management for Hillstar.
Combines registry-based provider configuration with user-level
API key management. Provides methods for:
- Loading and merging configurations
- Validating provider configurations
- Managing user API keys and settings
- Checking compliance requirements
"""
USER_CONFIG_DIR = Path.home() / ".hillstar"
USER_CONFIG_FILE = USER_CONFIG_DIR / "provider_registry.json"
USER_OVERRIDE_PATH = USER_CONFIG_FILE
[docs]
def __init__(self):
"""Initialize HillstarConfig with user and default configurations."""
self.registry = ProviderRegistry()
self.user_config: dict[str, Any] = self._load_or_init_config()
# ===== User Configuration Management =====
[docs]
def set_provider_key(self, provider: str, api_key: str) -> None:
"""
Store API key for a provider.
Args:
provider: Provider name (e.g., 'anthropic', 'openai')
api_key: API key value
Raises:
ValueError: If provider name or api_key is empty
"""
if not provider or not api_key:
raise ValueError("Provider name and API key cannot be empty")
if "providers" not in self.user_config:
self.user_config["providers"] = {}
self.user_config["providers"][provider] = {"api_key": api_key}
[docs]
def get_provider_key(self, provider: str) -> Optional[str]:
"""
Retrieve API key for a provider.
Args:
provider: Provider name
Returns:
API key if configured, None otherwise
"""
if "providers" not in self.user_config:
return None
provider_config = self.user_config["providers"].get(provider, {})
return provider_config.get("api_key")
[docs]
def list_missing_providers(
self, all_providers: Optional[list[str]] = None
) -> list[str]:
"""
List providers not yet configured.
Args:
all_providers: List of provider names to check against.
If None, uses default provider list.
Returns:
List of provider names without keys configured
"""
if all_providers is None:
all_providers = [
"anthropic",
"openai",
"google_ai_studio",
"mistral",
"ollama",
"devstral_local",
"anthropic_ollama",
]
configured = self.list_configured_providers()
missing = [p for p in all_providers if p not in configured]
return sorted(missing)
[docs]
def validate_key(self, provider: str, api_key: str) -> bool:
"""
Validate that an API key is non-empty and reasonably formatted.
This is basic validation (non-empty, reasonable length).
Full validation (API call) deferred to runtime.
Args:
provider: Provider name
api_key: API key to validate
Returns:
True if key passes basic validation, False otherwise
"""
if not api_key or not isinstance(api_key, str):
return False
# Basic checks: non-empty, reasonable length (>= 8 chars)
if len(api_key.strip()) < 8:
return False
# No spaces in key (most API keys don't have them)
if " " in api_key:
return False
return True
[docs]
def save_config(self) -> None:
"""
Write configuration to ~/.hillstar/provider_registry.json.
Creates the directory if it doesn't exist.
Raises:
IOError: If unable to write file
"""
try:
self.USER_CONFIG_DIR.mkdir(parents=True, exist_ok=True)
# Prepare config with metadata
config_to_save = {
"version": "1.0.0",
"description": "User provider configuration",
"providers": self.user_config.get("providers", {}),
}
with open(self.USER_CONFIG_FILE, "w") as f:
json.dump(config_to_save, f, indent=2)
except IOError as e:
msg = f"Failed to save config to {self.USER_CONFIG_FILE}: {e}"
raise IOError(msg) from e
[docs]
def load_config(self) -> None:
"""
Load configuration from ~/.hillstar/provider_registry.json.
Creates empty config if file doesn't exist.
"""
if self.USER_CONFIG_FILE.exists():
try:
with open(self.USER_CONFIG_FILE) as f:
loaded = json.load(f)
self.user_config = loaded
except json.JSONDecodeError as e:
msg = f"Invalid JSON in {self.USER_CONFIG_FILE}: {e}"
raise ValueError(msg) from e
else:
self.user_config = {"providers": {}}
# ===== Registry-Based Configuration =====
[docs]
def get_merged_registry(self) -> ProviderRegistry:
"""Get the complete registry with user overrides applied."""
return self.registry
[docs]
def validate_provider_config(
self,
provider: str,
config: dict[str, Any],
) -> list[str]:
"""
Validate provider configuration against registry.
Args:
provider: Provider name
config: Provider configuration dict
Returns:
List of validation error messages (empty if valid)
"""
errors = []
# Get provider schema from registry
provider_schema = self.registry.get_provider(provider)
if not provider_schema:
# Unknown provider - skip validation
return errors
# Check required env vars are configured
env_vars = provider_schema.get("env_vars", [])
for env_var in env_vars:
if env_var not in config and not os.getenv(env_var):
msg = (
f"Provider '{provider}' requires '{env_var}' "
f"env var or config key"
)
errors.append(msg)
# Validate endpoint format for non-local providers
provider_type = provider_schema.get("type", "")
if provider_type == "cloud_api":
endpoint = provider_schema.get("endpoint", "")
if endpoint and not (
endpoint.startswith("http://") or endpoint.startswith("https://")
):
msg = f"Provider '{provider}' has invalid endpoint: {endpoint}"
errors.append(msg)
# Validate model is in registry
model_name = config.get("model")
if model_name:
models = provider_schema.get("models", {})
if models and model_name not in models:
valid = list(models.keys())
msg = (
f"Unknown model '{model_name}' for provider "
f"'{provider}'. Valid: {', '.join(valid[:5])}"
f"{'...' if len(valid) > 5 else ''}"
)
errors.append(msg)
return errors
[docs]
def check_compliance(
self,
provider: str,
config: dict[str, Any],
) -> tuple[bool, list[str]]:
"""
Check compliance requirements for a provider.
Args:
provider: Provider name
config: Provider configuration
Returns:
(is_compliant: bool, issues: List[str])
"""
issues = []
provider_config = self.registry.get_provider(provider)
if not provider_config:
return True, issues
compliance = provider_config.get("compliance", {})
if not compliance:
return True, issues
# Check ToS acceptance requirement
requires_tos = compliance.get("requires_tos_acceptance", False)
if requires_tos:
tos_accepted = config.get("tos_accepted", False)
if not tos_accepted:
tos_url = compliance.get("tos_url", "N/A")
msg = (
f"Provider '{provider}' requires ToS acceptance. "
f"See: {tos_url}"
)
issues.append(msg)
# Check audit requirement
audit_required = compliance.get("audit_required", False)
if audit_required and not config.get("audit_enabled", False):
msg = (
f"Provider '{provider}' requires audit logging "
f"to be enabled"
)
issues.append(msg)
# Check data residency requirements
allowed_regions = compliance.get("data_residency", [])
if allowed_regions and "local" not in allowed_regions:
# For cloud providers, user should be aware
msg = (
f"Provider '{provider}' processes data in: "
f"{', '.join(allowed_regions)}"
)
issues.append(msg)
return len(issues) == 0, issues
[docs]
def get_provider_info(
self, provider: str
) -> Optional[dict[str, Any]]:
"""Get full provider configuration from registry."""
return self.registry.get_provider(provider)
[docs]
def list_available_providers(self) -> list[str]:
"""List all available providers from registry."""
return self.registry.list_providers()
[docs]
def list_available_models(self, provider: str) -> list[str]:
"""List all available models for a provider."""
provider_config = self.registry.get_provider(provider)
if not provider_config:
return []
models = provider_config.get("models", {})
return list(models.keys())
[docs]
def merge_configs(
self,
user_config: dict[str, Any],
workflow_config: dict[str, Any],
) -> dict[str, Any]:
"""
Merge user configuration with workflow configuration.
Workflow configuration takes precedence over user config.
Args:
user_config: User provider configuration overrides
workflow_config: Workflow-specific model configuration
Returns:
Merged configuration dictionary
"""
# Start with user config as base
merged = user_config.copy()
# Override with workflow config (workflow takes precedence)
merged.update(workflow_config)
return merged
# ===== Internal Methods =====
def _load_or_init_config(self) -> dict[str, Any]:
"""Load existing config or initialize empty one."""
if self.USER_CONFIG_FILE.exists():
try:
with open(self.USER_CONFIG_FILE) as f:
return json.load(f)
except (json.JSONDecodeError, IOError):
return {"providers": {}}
return {"providers": {}}