feat(mcp): Add robust error handling and timeout configuration

Critical improvements to MCP server reliability and client experience:

Error Handling:
- Created MCPErrorFormatter for consistent error responses across all tools
- Provides structured errors with type, message, details, and actionable suggestions
- Helps clients (like Claude Code) understand and handle failures gracefully
- Categorizes errors (connection_timeout, validation_error, etc.) for better debugging

Timeout Configuration:
- Centralized timeout config with environment variable support
- Different timeouts for regular operations vs polling operations
- Configurable via MCP_REQUEST_TIMEOUT, MCP_CONNECT_TIMEOUT, etc.
- Prevents indefinite hangs when services are unavailable

Module Registration:
- Distinguishes between ImportError (acceptable) and code errors (must fix)
- SyntaxError/NameError/AttributeError now halt execution immediately
- Prevents broken code from silently failing in production

Polling Safety:
- Fixed project creation polling with exponential backoff
- Handles API unavailability with proper error messages
- Maximum attempts configurable via MCP_MAX_POLLING_ATTEMPTS

Response Normalization:
- Fixed inconsistent response handling in list_tasks
- Validates and normalizes different API response formats
- Clear error messages when response format is unexpected

These changes address critical issues from PR review while maintaining
backward compatibility. All 20 existing tests pass.
This commit is contained in:
Rasmus Widing 2025-08-19 15:38:13 +03:00
parent 307e0e3b71
commit cf3d7b17fe
7 changed files with 441 additions and 56 deletions

View File

@ -14,6 +14,13 @@ from urllib.parse import urljoin
import httpx
from mcp.server.fastmcp import Context, FastMCP
from src.mcp_server.utils.error_handling import MCPErrorFormatter
from src.mcp_server.utils.timeout_config import (
get_default_timeout,
get_max_polling_attempts,
get_polling_interval,
get_polling_timeout,
)
from src.server.config.service_discovery import get_api_url
logger = logging.getLogger(__name__)
@ -65,7 +72,7 @@ def register_project_tools(mcp: FastMCP):
"""
try:
api_url = get_api_url()
timeout = httpx.Timeout(30.0, connect=5.0)
timeout = get_default_timeout()
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.post(
@ -78,40 +85,71 @@ def register_project_tools(mcp: FastMCP):
# Handle async project creation
if "progress_id" in result:
# Poll for completion (max 30 seconds)
for attempt in range(30):
await asyncio.sleep(1)
# List projects to find the newly created one
list_response = await client.get(urljoin(api_url, "/api/projects"))
if list_response.status_code == 200:
projects = list_response.json()
# Find project with matching title created recently
for proj in projects:
if proj.get("title") == title:
return json.dumps({
"success": True,
"project": proj,
"project_id": proj["id"],
"message": f"Project created successfully with ID: {proj['id']}",
})
# Poll for completion with proper error handling and backoff
max_attempts = get_max_polling_attempts()
polling_timeout = get_polling_timeout()
for attempt in range(max_attempts):
try:
# Exponential backoff
sleep_interval = get_polling_interval(attempt)
await asyncio.sleep(sleep_interval)
# Create new client with polling timeout
async with httpx.AsyncClient(timeout=polling_timeout) as poll_client:
list_response = await poll_client.get(urljoin(api_url, "/api/projects"))
list_response.raise_for_status() # Raise on HTTP errors
projects = list_response.json()
# Find project with matching title created recently
for proj in projects:
if proj.get("title") == title:
return json.dumps({
"success": True,
"project": proj,
"project_id": proj["id"],
"message": f"Project created successfully with ID: {proj['id']}",
})
except httpx.RequestError as poll_error:
logger.warning(f"Polling attempt {attempt + 1}/{max_attempts} failed: {poll_error}")
if attempt == max_attempts - 1: # Last attempt
return MCPErrorFormatter.format_error(
error_type="polling_timeout",
message=f"Project creation polling failed after {max_attempts} attempts",
details={
"progress_id": result["progress_id"],
"title": title,
"last_error": str(poll_error),
},
suggestion="The project may still be creating. Use list_projects to check status",
)
except Exception as poll_error:
logger.warning(f"Unexpected error during polling attempt {attempt + 1}: {poll_error}")
# If we couldn't find it after polling
return json.dumps({
"success": True,
"progress_id": result["progress_id"],
"message": "Project creation started. Use list_projects to find it once complete.",
"message": f"Project creation in progress after {max_attempts} checks. Use list_projects to find it once complete.",
})
else:
# Direct response (shouldn't happen with current API)
return json.dumps({"success": True, "project": result})
else:
error_detail = response.json().get("detail", {}).get("error", "Unknown error")
return json.dumps({"success": False, "error": error_detail})
return MCPErrorFormatter.from_http_error(response, "create project")
except httpx.ConnectError as e:
return MCPErrorFormatter.from_exception(
e, "create project", {"title": title, "api_url": api_url}
)
except httpx.TimeoutException as e:
return MCPErrorFormatter.from_exception(
e, "create project", {"title": title, "timeout": str(timeout)}
)
except Exception as e:
logger.error(f"Error creating project: {e}")
return json.dumps({"success": False, "error": str(e)})
logger.error(f"Error creating project: {e}", exc_info=True)
return MCPErrorFormatter.from_exception(e, "create project", {"title": title})
@mcp.tool()
async def list_projects(ctx: Context) -> str:
@ -126,7 +164,7 @@ def register_project_tools(mcp: FastMCP):
"""
try:
api_url = get_api_url()
timeout = httpx.Timeout(30.0, connect=5.0)
timeout = get_default_timeout()
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.get(urljoin(api_url, "/api/projects"))
@ -139,11 +177,13 @@ def register_project_tools(mcp: FastMCP):
"count": len(projects),
})
else:
return json.dumps({"success": False, "error": "Failed to list projects"})
return MCPErrorFormatter.from_http_error(response, "list projects")
except httpx.RequestError as e:
return MCPErrorFormatter.from_exception(e, "list projects", {"api_url": api_url})
except Exception as e:
logger.error(f"Error listing projects: {e}")
return json.dumps({"success": False, "error": str(e)})
logger.error(f"Error listing projects: {e}", exc_info=True)
return MCPErrorFormatter.from_exception(e, "list projects")
@mcp.tool()
async def get_project(ctx: Context, project_id: str) -> str:

View File

@ -13,6 +13,8 @@ from urllib.parse import urljoin
import httpx
from mcp.server.fastmcp import Context, FastMCP
from src.mcp_server.utils.error_handling import MCPErrorFormatter
from src.mcp_server.utils.timeout_config import get_default_timeout
from src.server.config.service_discovery import get_api_url
logger = logging.getLogger(__name__)
@ -110,7 +112,7 @@ def register_task_tools(mcp: FastMCP):
"""
try:
api_url = get_api_url()
timeout = httpx.Timeout(30.0, connect=5.0)
timeout = get_default_timeout()
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.post(
@ -174,7 +176,7 @@ def register_task_tools(mcp: FastMCP):
"""
try:
api_url = get_api_url()
timeout = httpx.Timeout(30.0, connect=5.0)
timeout = get_default_timeout()
# Build URL and parameters based on filter type
params: Dict[str, Any] = {
@ -207,31 +209,49 @@ def register_task_tools(mcp: FastMCP):
result = response.json()
# Handle both direct array and paginated response formats
# Normalize response format - handle both array and object responses
if isinstance(result, list):
# Direct array response
tasks = result
pagination_info = None
else:
total_count = len(result)
elif isinstance(result, dict):
# Object response - check for standard fields
if "tasks" in result:
tasks = result.get("tasks", [])
pagination_info = result.get("pagination", {})
tasks = result["tasks"]
total_count = result.get("total_count", len(tasks))
elif "data" in result:
# Alternative format with 'data' field
tasks = result["data"]
total_count = result.get("total", len(tasks))
else:
tasks = result if isinstance(result, list) else []
pagination_info = None
# Unknown object format
return MCPErrorFormatter.format_error(
error_type="invalid_response",
message="Unexpected response format from API",
details={"response_keys": list(result.keys())},
suggestion="The API response format may have changed. Please check for updates.",
)
else:
# Completely unexpected format
return MCPErrorFormatter.format_error(
error_type="invalid_response",
message="Invalid response type from API",
details={"response_type": type(result).__name__},
suggestion="Expected list or object, got different type.",
)
return json.dumps({
"success": True,
"tasks": tasks,
"pagination": pagination_info,
"total_count": len(tasks)
if pagination_info is None
else pagination_info.get("total", len(tasks)),
"total_count": total_count,
"count": len(tasks),
})
except httpx.RequestError as e:
return MCPErrorFormatter.from_exception(e, "list tasks", {"filter_by": filter_by, "filter_value": filter_value})
except Exception as e:
logger.error(f"Error listing tasks: {e}")
return json.dumps({"success": False, "error": str(e)})
logger.error(f"Error listing tasks: {e}", exc_info=True)
return MCPErrorFormatter.from_exception(e, "list tasks")
@mcp.tool()
async def get_task(ctx: Context, task_id: str) -> str:
@ -249,7 +269,7 @@ def register_task_tools(mcp: FastMCP):
"""
try:
api_url = get_api_url()
timeout = httpx.Timeout(30.0, connect=5.0)
timeout = get_default_timeout()
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.get(urljoin(api_url, f"/api/tasks/{task_id}"))
@ -288,7 +308,7 @@ def register_task_tools(mcp: FastMCP):
"""
try:
api_url = get_api_url()
timeout = httpx.Timeout(30.0, connect=5.0)
timeout = get_default_timeout()
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.put(
@ -334,7 +354,7 @@ def register_task_tools(mcp: FastMCP):
"""
try:
api_url = get_api_url()
timeout = httpx.Timeout(30.0, connect=5.0)
timeout = get_default_timeout()
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.delete(urljoin(api_url, f"/api/tasks/{task_id}"))

View File

@ -384,10 +384,18 @@ def register_modules():
modules_registered += 1
logger.info("✓ Project tools registered")
except ImportError as e:
logger.warning(f"⚠ Project tools not available: {e}")
except Exception as e:
logger.error(f"✗ Error registering project tools: {e}")
# Module not found - this is acceptable in modular architecture
logger.warning(f"⚠ Project tools module not available (optional): {e}")
except (SyntaxError, NameError, AttributeError) as e:
# Code errors that should not be ignored
logger.error(f"✗ Code error in project tools - MUST FIX: {e}")
logger.error(traceback.format_exc())
raise # Re-raise to prevent running with broken code
except Exception as e:
# Unexpected errors during registration
logger.error(f"✗ Failed to register project tools: {e}")
logger.error(traceback.format_exc())
# Don't raise - allow other modules to register
# Task Management Tools
try:
@ -397,9 +405,13 @@ def register_modules():
modules_registered += 1
logger.info("✓ Task tools registered")
except ImportError as e:
logger.warning(f"⚠ Task tools not available: {e}")
logger.warning(f"⚠ Task tools module not available (optional): {e}")
except (SyntaxError, NameError, AttributeError) as e:
logger.error(f"✗ Code error in task tools - MUST FIX: {e}")
logger.error(traceback.format_exc())
raise
except Exception as e:
logger.error(f"✗ Error registering task tools: {e}")
logger.error(f"Failed to register task tools: {e}")
logger.error(traceback.format_exc())
# Document Management Tools
@ -410,9 +422,13 @@ def register_modules():
modules_registered += 1
logger.info("✓ Document tools registered")
except ImportError as e:
logger.warning(f"⚠ Document tools not available: {e}")
logger.warning(f"⚠ Document tools module not available (optional): {e}")
except (SyntaxError, NameError, AttributeError) as e:
logger.error(f"✗ Code error in document tools - MUST FIX: {e}")
logger.error(traceback.format_exc())
raise
except Exception as e:
logger.error(f"✗ Error registering document tools: {e}")
logger.error(f"Failed to register document tools: {e}")
logger.error(traceback.format_exc())
# Version Management Tools
@ -423,9 +439,13 @@ def register_modules():
modules_registered += 1
logger.info("✓ Version tools registered")
except ImportError as e:
logger.warning(f"⚠ Version tools not available: {e}")
logger.warning(f"⚠ Version tools module not available (optional): {e}")
except (SyntaxError, NameError, AttributeError) as e:
logger.error(f"✗ Code error in version tools - MUST FIX: {e}")
logger.error(traceback.format_exc())
raise
except Exception as e:
logger.error(f"✗ Error registering version tools: {e}")
logger.error(f"Failed to register version tools: {e}")
logger.error(traceback.format_exc())
# Feature Management Tools
@ -436,9 +456,13 @@ def register_modules():
modules_registered += 1
logger.info("✓ Feature tools registered")
except ImportError as e:
logger.warning(f"⚠ Feature tools not available: {e}")
logger.warning(f"⚠ Feature tools module not available (optional): {e}")
except (SyntaxError, NameError, AttributeError) as e:
logger.error(f"✗ Code error in feature tools - MUST FIX: {e}")
logger.error(traceback.format_exc())
raise
except Exception as e:
logger.error(f"✗ Error registering feature tools: {e}")
logger.error(f"Failed to register feature tools: {e}")
logger.error(traceback.format_exc())
logger.info(f"📦 Total modules registered: {modules_registered}")

View File

@ -0,0 +1,21 @@
"""
Utility modules for MCP Server.
"""
from .error_handling import MCPErrorFormatter
from .http_client import get_http_client
from .timeout_config import (
get_default_timeout,
get_max_polling_attempts,
get_polling_interval,
get_polling_timeout,
)
__all__ = [
"MCPErrorFormatter",
"get_http_client",
"get_default_timeout",
"get_polling_timeout",
"get_max_polling_attempts",
"get_polling_interval",
]

View File

@ -0,0 +1,166 @@
"""
Centralized error handling utilities for MCP Server.
Provides consistent error formatting and helpful context for clients.
"""
import json
import logging
from typing import Any, Dict, Optional
import httpx
logger = logging.getLogger(__name__)
class MCPErrorFormatter:
"""Formats errors consistently for MCP clients."""
@staticmethod
def format_error(
error_type: str,
message: str,
details: Optional[Dict[str, Any]] = None,
suggestion: Optional[str] = None,
http_status: Optional[int] = None,
) -> str:
"""
Format an error response with consistent structure.
Args:
error_type: Category of error (e.g., "connection_error", "validation_error")
message: User-friendly error message
details: Additional context about the error
suggestion: Actionable suggestion for resolving the error
http_status: HTTP status code if applicable
Returns:
JSON string with structured error information
"""
error_response: Dict[str, Any] = {
"success": False,
"error": {
"type": error_type,
"message": message,
},
}
if details:
error_response["error"]["details"] = details
if suggestion:
error_response["error"]["suggestion"] = suggestion
if http_status:
error_response["error"]["http_status"] = http_status
return json.dumps(error_response)
@staticmethod
def from_http_error(response: httpx.Response, operation: str) -> str:
"""
Format error from HTTP response.
Args:
response: The HTTP response object
operation: Description of what operation was being performed
Returns:
Formatted error JSON string
"""
# Try to extract error from response body
try:
body = response.json()
if isinstance(body, dict):
# Look for common error fields
error_message = (
body.get("detail", {}).get("error")
or body.get("error")
or body.get("message")
or body.get("detail")
)
if error_message:
return MCPErrorFormatter.format_error(
error_type="api_error",
message=f"Failed to {operation}: {error_message}",
details={"response_body": body},
http_status=response.status_code,
suggestion=_get_suggestion_for_status(response.status_code),
)
except Exception:
pass # Fall through to generic error
# Generic error based on status code
return MCPErrorFormatter.format_error(
error_type="http_error",
message=f"Failed to {operation}: HTTP {response.status_code}",
details={"response_text": response.text[:500]}, # Limit response text
http_status=response.status_code,
suggestion=_get_suggestion_for_status(response.status_code),
)
@staticmethod
def from_exception(exception: Exception, operation: str, context: Optional[Dict[str, Any]] = None) -> str:
"""
Format error from exception.
Args:
exception: The exception that occurred
operation: Description of what operation was being performed
context: Additional context about when the error occurred
Returns:
Formatted error JSON string
"""
error_type = "unknown_error"
suggestion = None
# Categorize common exceptions
if isinstance(exception, httpx.ConnectTimeout):
error_type = "connection_timeout"
suggestion = "Check if the Archon server is running and accessible at the configured URL"
elif isinstance(exception, httpx.ReadTimeout):
error_type = "read_timeout"
suggestion = "The operation is taking longer than expected. Try again or check server logs"
elif isinstance(exception, httpx.ConnectError):
error_type = "connection_error"
suggestion = "Ensure the Archon server is running on the correct port"
elif isinstance(exception, httpx.RequestError):
error_type = "request_error"
suggestion = "Check network connectivity and server configuration"
elif isinstance(exception, ValueError):
error_type = "validation_error"
suggestion = "Check that all input parameters are valid"
elif isinstance(exception, KeyError):
error_type = "missing_data"
suggestion = "The response format may have changed. Check for API updates"
details: Dict[str, Any] = {"exception_type": type(exception).__name__, "exception_message": str(exception)}
if context:
details["context"] = context
return MCPErrorFormatter.format_error(
error_type=error_type,
message=f"Failed to {operation}: {str(exception)}",
details=details,
suggestion=suggestion,
)
def _get_suggestion_for_status(status_code: int) -> Optional[str]:
"""Get helpful suggestion based on HTTP status code."""
suggestions = {
400: "Check that all required parameters are provided and valid",
401: "Authentication may be required. Check API credentials",
403: "You may not have permission for this operation",
404: "The requested resource was not found. Verify the ID is correct",
409: "There's a conflict with the current state. The resource may already exist",
422: "The request format is correct but the data is invalid",
429: "Too many requests. Please wait before retrying",
500: "Server error. Check server logs for details",
502: "The backend service may be down. Check if all services are running",
503: "Service temporarily unavailable. Try again later",
504: "The operation timed out. The server may be overloaded",
}
return suggestions.get(status_code)

View File

@ -0,0 +1,38 @@
"""
HTTP client utilities for MCP Server.
Provides consistent HTTP client configuration.
"""
from contextlib import asynccontextmanager
from typing import AsyncIterator, Optional
import httpx
from .timeout_config import get_default_timeout, get_polling_timeout
@asynccontextmanager
async def get_http_client(
timeout: Optional[httpx.Timeout] = None, for_polling: bool = False
) -> AsyncIterator[httpx.AsyncClient]:
"""
Create an HTTP client with consistent configuration.
Args:
timeout: Optional custom timeout. If not provided, uses defaults.
for_polling: If True, uses polling-specific timeout configuration.
Yields:
Configured httpx.AsyncClient
Example:
async with get_http_client() as client:
response = await client.get(url)
"""
if timeout is None:
timeout = get_polling_timeout() if for_polling else get_default_timeout()
# Future: Could add retry logic, custom headers, etc. here
async with httpx.AsyncClient(timeout=timeout) as client:
yield client

View File

@ -0,0 +1,76 @@
"""
Centralized timeout configuration for MCP Server.
Provides consistent timeout values across all tools.
"""
import os
from typing import Optional
import httpx
def get_default_timeout() -> httpx.Timeout:
"""
Get default timeout configuration from environment or defaults.
Environment variables:
- MCP_REQUEST_TIMEOUT: Total request timeout in seconds (default: 30)
- MCP_CONNECT_TIMEOUT: Connection timeout in seconds (default: 5)
- MCP_READ_TIMEOUT: Read timeout in seconds (default: 20)
- MCP_WRITE_TIMEOUT: Write timeout in seconds (default: 10)
Returns:
Configured httpx.Timeout object
"""
return httpx.Timeout(
timeout=float(os.getenv("MCP_REQUEST_TIMEOUT", "30.0")),
connect=float(os.getenv("MCP_CONNECT_TIMEOUT", "5.0")),
read=float(os.getenv("MCP_READ_TIMEOUT", "20.0")),
write=float(os.getenv("MCP_WRITE_TIMEOUT", "10.0")),
)
def get_polling_timeout() -> httpx.Timeout:
"""
Get timeout configuration for polling operations.
Polling operations may need longer timeouts.
Returns:
Configured httpx.Timeout object for polling
"""
return httpx.Timeout(
timeout=float(os.getenv("MCP_POLLING_TIMEOUT", "60.0")),
connect=float(os.getenv("MCP_CONNECT_TIMEOUT", "5.0")),
read=float(os.getenv("MCP_POLLING_READ_TIMEOUT", "30.0")),
write=float(os.getenv("MCP_WRITE_TIMEOUT", "10.0")),
)
def get_max_polling_attempts() -> int:
"""
Get maximum number of polling attempts.
Returns:
Maximum polling attempts (default: 30)
"""
return int(os.getenv("MCP_MAX_POLLING_ATTEMPTS", "30"))
def get_polling_interval(attempt: int) -> float:
"""
Get polling interval with exponential backoff.
Args:
attempt: Current attempt number (0-based)
Returns:
Sleep interval in seconds
"""
base_interval = float(os.getenv("MCP_POLLING_BASE_INTERVAL", "1.0"))
max_interval = float(os.getenv("MCP_POLLING_MAX_INTERVAL", "5.0"))
# Exponential backoff: 1s, 2s, 4s, 5s, 5s, ...
interval = min(base_interval * (2**attempt), max_interval)
return float(interval)