Source code for models.mcp_model

"""
Script
------
mcp_model.py

Path
----
models/mcp_model.py

Purpose
-------
Base class for MCP-based model providers: Handle subprocess lifecycle and JSON-RPC communication.

Provides unified interface to MCP servers (stdio-based) with automatic initialization,
error handling, and response normalization to match AnthropicModel.call() interface.

Inputs
------
provider (str): Provider name (e.g., "anthropic_mcp")
model_name (str): Model identifier
server_script (str): Path to MCP server script
api_key (str, optional): API key for the provider

Outputs
-------
Dictionary: {output, model, tokens_used, provider}

Assumptions
-----------
- MCP server script exists and is executable
- Server implements standard MCP protocol (initialize, tools/call)
- run_with_env.sh wrapper is available in mcp-server/

Failure Modes
-------------
- Process spawn fails RuntimeError
- MCP server crashes RuntimeError (EOF on stdout)
- Invalid JSON response json.JSONDecodeError

Author: Julen Gamboa <julen.gamboa.ds@gmail.com>

Created
-------
2026-02-17

Last Edited
-----------
2026-02-17
"""

import json
import os
import subprocess
from pathlib import Path
from typing import Any

from utils.credential_redactor import redact


[docs] class MCPModel: """Base class for MCP-based model providers.""" TEMPERATURE_DEFAULT = 0.00000073 # Minimize hallucinations (match AnthropicModel)
[docs] def __init__( self, provider: str, model_name: str, server_script: str, api_key: str | None = None, ): """ Initialize MCP model. Args: provider: Provider name (e.g., "anthropic_mcp") model_name: Model identifier (e.g., "claude-opus-4-6") server_script: Path to MCP server script (relative to repo root) api_key: Optional API key (else reads from environment) """ self.provider = provider self.model_name = model_name self.server_script = server_script self.api_key = api_key self.process = None self._request_id = 0 self._initialized = False self._api_key_missing_error = None # Check if API key is missing for providers that require it if not api_key and provider not in ["ollama_mcp"]: base_provider = provider.replace("_mcp", "") self._api_key_missing_error = ( f"API key for '{base_provider}' not found. " f"Run: hillstar config\n" f"See: https://github.com/julen-gcs/agentic-orchestrator#provider-setup" )
def _ensure_process(self) -> None: """Spawn MCP server subprocess if not running.""" if self.process is not None and self.process.poll() is None: # Process still running return # Prepare environment env = os.environ.copy() # Set provider-specific API key if provided if self.api_key: env_var_map = { "anthropic": "ANTHROPIC_API_KEY", "openai": "OPENAI_API_KEY", "mistral": "MISTRAL_API_KEY", "google_ai_studio": "GOOGLE_API_KEY", } base_provider = self.provider.replace("_mcp", "") env_var_name = env_var_map.get(base_provider) if env_var_name: env[env_var_name] = self.api_key # Determine repo root (mcp_model.py is in models/, so go up 2 levels) repo_root = Path(__file__).parent.parent # Spawn subprocess using run_with_env.sh wrapper try: self.process = subprocess.Popen( [ "bash", str(repo_root / "mcp-server" / "run_with_env.sh"), str(repo_root / self.server_script), ], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1, # Line-buffered mode cwd=str(repo_root), env=env, ) except Exception as e: raise RuntimeError(f"Failed to spawn MCP server: {e}") # Send initialize request self._send_initialize() self._initialized = True def _send_initialize(self) -> None: """Send JSON-RPC initialize request.""" request = { "jsonrpc": "2.0", "id": 1, "method": "initialize", "params": { "protocolVersion": "2024-11-05", "capabilities": {}, "clientInfo": {"name": "hillstar", "version": "1.0"}, }, } response = self._send_request(request) if not response or response.get("isError"): raise RuntimeError(f"MCP initialization failed: {response}") def _send_request(self, request: dict) -> dict: """Send JSON-RPC request and read response. Args: request: JSON-RPC request dict Returns: JSON-RPC response dict (with result or error) """ if not self.process: raise RuntimeError("MCP process not initialized") # Type guards: streams are guaranteed non-None by PIPE configuration assert self.process.stdin is not None, "stdin should not be None" assert self.process.stdout is not None, "stdout should not be None" assert self.process.stderr is not None, "stderr should not be None" try: # Send request self.process.stdin.write(json.dumps(request) + "\n") self.process.stdin.flush() # Read response response_line = self.process.stdout.readline() if not response_line: # EOF - server crashed or closed stderr = self.process.stderr.read() # Redact any credentials from stderr before including in error stderr = redact(stderr) raise RuntimeError( f"MCP server closed connection. Error output: {stderr}" ) return json.loads(response_line) except json.JSONDecodeError as e: raise RuntimeError(f"Invalid JSON from MCP server: {e}") except Exception as e: raise RuntimeError(f"MCP communication error: {e}")
[docs] def call( self, prompt: str, max_tokens: int = 4096, temperature: float | None = None, system: str | None = None, ) -> dict[str, Any]: """Execute task via MCP server. Matches AnthropicModel.call() interface for compatibility. Args: prompt: Input prompt max_tokens: Maximum tokens to generate temperature: Sampling temperature (unused for MCP servers) system: System prompt (unused for MCP servers) Returns: Dictionary with response and metadata """ if temperature is None: temperature = self.TEMPERATURE_DEFAULT # Check for missing API key before attempting to spawn process if self._api_key_missing_error: return { "output": None, "error": self._api_key_missing_error, "provider": self.provider, } try: # Ensure process is running self._ensure_process() # Increment request ID self._request_id += 1 # Build task prompt task_prompt = prompt if system: task_prompt = f"{system}\n\n{prompt}" # Send tools/call request request = { "jsonrpc": "2.0", "id": self._request_id + 1, # Offset from init request "method": "tools/call", "params": { "name": "execute_task", "arguments": { "prompt": task_prompt, "model": self.model_name, }, }, } response = self._send_request(request) result = response.get("result", {}) # Check for errors if result.get("isError"): error_text = "Unknown error" if result.get("content"): error_text = result["content"][0].get("text", error_text) return { "output": None, "error": redact(error_text), "provider": self.provider, } # Extract output output = "No output" if result.get("content"): output = result["content"][0].get("text", output) return { "output": output, "model": self.model_name, "tokens_used": 0, # MCP servers don't return token counts "provider": self.provider, } except Exception as e: # Sanitize error message (don't expose internal details or API keys) error_msg = str(e) # First, redact any credentials that might be in the error error_msg = redact(error_msg) # Then, provide helpful message for common errors if "API key" in error_msg or "ANTHROPIC_API_KEY" in error_msg: error_msg = "API authentication failed. Check your credentials with: hillstar config --show" elif "MCP" in error_msg or "subprocess" in error_msg.lower(): error_msg = f"Failed to connect to {self.provider} provider. Please try again." return { "output": None, "error": error_msg, "provider": self.provider, }
[docs] def __del__(self): """Cleanup subprocess on deletion.""" if self.process and self.process.stdin: try: self.process.stdin.close() self.process.wait(timeout=2) except Exception: pass