""" MCP Server for Archon (Microservices Version) This is the MCP server that uses HTTP calls to other services instead of importing heavy dependencies directly. This significantly reduces the container size from 1.66GB to ~150MB. Modules: - RAG Module: RAG queries, search, and source management via HTTP - Project Module: Task and project management via HTTP - Health & Session: Local operations Note: Crawling and document upload operations are handled directly by the API service and frontend, not through MCP tools. """ import json import logging import os import sys import threading import time import traceback from collections.abc import AsyncIterator from contextlib import asynccontextmanager from dataclasses import dataclass from datetime import datetime from pathlib import Path from typing import Any from dotenv import load_dotenv from mcp.server.fastmcp import Context, FastMCP from starlette.requests import Request from starlette.responses import JSONResponse # Add the project root to Python path for imports sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) # Load environment variables from the project root .env file project_root = Path(__file__).resolve().parent.parent dotenv_path = project_root / ".env" load_dotenv(dotenv_path, override=True) # Configure logging FIRST before any imports that might use it logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", handlers=[ logging.StreamHandler(sys.stdout), logging.FileHandler("/tmp/mcp_server.log", mode="a") if os.path.exists("/tmp") else logging.NullHandler(), ], ) logger = logging.getLogger(__name__) # Import Logfire configuration from src.server.config.logfire_config import mcp_logger, setup_logfire # Import service client for HTTP calls from src.server.services.mcp_service_client import get_mcp_service_client # Import session management from src.server.services.mcp_session_manager import get_session_manager # Global initialization lock and flag _initialization_lock = threading.Lock() _initialization_complete = False _shared_context = None server_host = "0.0.0.0" # Listen on all interfaces # Require ARCHON_MCP_PORT to be set mcp_port = os.getenv("ARCHON_MCP_PORT") if not mcp_port: raise ValueError( "ARCHON_MCP_PORT environment variable is required. " "Please set it in your .env file or environment. " "Default value: 8051" ) server_port = int(mcp_port) @dataclass class ArchonContext: """ Context for MCP server. No heavy dependencies - just service client for HTTP calls. """ service_client: Any health_status: dict = None startup_time: float = None def __post_init__(self): if self.health_status is None: self.health_status = { "status": "healthy", "api_service": False, "agents_service": False, "last_health_check": None, } if self.startup_time is None: self.startup_time = time.time() async def perform_health_checks(context: ArchonContext): """Perform health checks on dependent services via HTTP.""" try: # Check dependent services service_health = await context.service_client.health_check() context.health_status["api_service"] = service_health.get("api_service", False) context.health_status["agents_service"] = service_health.get("agents_service", False) # Overall status all_critical_ready = context.health_status["api_service"] context.health_status["status"] = "healthy" if all_critical_ready else "degraded" context.health_status["last_health_check"] = datetime.now().isoformat() if not all_critical_ready: logger.warning(f"Health check failed: {context.health_status}") else: logger.info("Health check passed - dependent services healthy") except Exception as e: logger.error(f"Health check error: {e}") context.health_status["status"] = "unhealthy" context.health_status["last_health_check"] = datetime.now().isoformat() @asynccontextmanager async def lifespan(server: FastMCP) -> AsyncIterator[ArchonContext]: """ Lifecycle manager - no heavy dependencies. """ global _initialization_complete, _shared_context # Quick check without lock if _initialization_complete and _shared_context: logger.info("โ™ป๏ธ Reusing existing context for new SSE connection") yield _shared_context return # Acquire lock for initialization with _initialization_lock: # Double-check pattern if _initialization_complete and _shared_context: logger.info("โ™ป๏ธ Reusing existing context for new SSE connection") yield _shared_context return logger.info("๐Ÿš€ Starting MCP server...") try: # Initialize session manager logger.info("๐Ÿ” Initializing session manager...") session_manager = get_session_manager() logger.info("โœ“ Session manager initialized") # Initialize service client for HTTP calls logger.info("๐ŸŒ Initializing service client...") service_client = get_mcp_service_client() logger.info("โœ“ Service client initialized") # Create context context = ArchonContext(service_client=service_client) # Perform initial health check await perform_health_checks(context) logger.info("โœ“ MCP server ready") # Store context globally _shared_context = context _initialization_complete = True yield context except Exception as e: logger.error(f"๐Ÿ’ฅ Critical error in lifespan setup: {e}") logger.error(traceback.format_exc()) raise finally: # Clean up resources logger.info("๐Ÿงน Cleaning up MCP server...") logger.info("โœ… MCP server shutdown complete") # Define MCP instructions for Claude Code and other clients MCP_INSTRUCTIONS = """ # Archon MCP Server Instructions ## ๐Ÿšจ CRITICAL RULES (ALWAYS FOLLOW) 1. **Task Management**: ALWAYS use Archon MCP tools for task management. - Combine with your local TODO tools for granular tracking 2. **Research First**: Before implementing, use rag_search_knowledge_base and rag_search_code_examples 3. **Task-Driven Development**: Never code without checking current tasks first ## ๐ŸŽฏ Targeted Documentation Search When searching specific documentation (very common!): 1. **Get available sources**: `rag_get_available_sources()` - Returns list with id, title, url 2. **Find source ID**: Match user's request to source title (e.g., "PydanticAI docs" -> find ID) 3. **Filter search**: `rag_search_knowledge_base(query="...", source_id="src_xxx", match_count=5)` Examples: - User: "Search the Supabase docs for vector functions" 1. Call `rag_get_available_sources()` 2. Find Supabase source ID from results (e.g., "src_abc123") 3. Call `rag_search_knowledge_base(query="vector functions", source_id="src_abc123")` - User: "Find authentication examples in the MCP documentation" 1. Call `rag_get_available_sources()` 2. Find MCP docs source ID 3. Call `rag_search_code_examples(query="authentication", source_id="src_def456")` IMPORTANT: Always use source_id (not URLs or domain names) for filtering! ## ๐Ÿ“‹ Core Workflow ### Task Management Cycle 1. **Get current task**: `list_tasks(task_id="...")` 2. **Search/List tasks**: `list_tasks(query="auth", filter_by="status", filter_value="todo")` 3. **Mark as doing**: `manage_task("update", task_id="...", status="doing")` 4. **Research phase**: - `rag_search_knowledge_base(query="...", match_count=5)` - `rag_search_code_examples(query="...", match_count=3)` 5. **Implementation**: Code based on research findings 6. **Mark for review**: `manage_task("update", task_id="...", status="review")` 7. **Get next task**: `list_tasks(filter_by="status", filter_value="todo")` ### Consolidated Task Tools (Optimized ~2 tools from 5) - `list_tasks(query=None, task_id=None, filter_by=None, filter_value=None, per_page=10)` - list + search + get in one tool - Search with keyword query parameter (optional) - task_id parameter for getting single task (full details) - Filter by status, project, or assignee - **Optimized**: Returns truncated descriptions and array counts (lists only) - **Default**: 10 items per page (was 50) - `manage_task(action, task_id=None, project_id=None, ...)` - **Consolidated**: create + update + delete in one tool - action: "create" | "update" | "delete" - Examples: - `manage_task("create", project_id="p-1", title="Fix auth")` - `manage_task("update", task_id="t-1", status="doing")` - `manage_task("delete", task_id="t-1")` ## ๐Ÿ—๏ธ Project Management ### Project Tools - `list_projects(project_id=None, query=None, page=1, per_page=10)` - List all projects, search by query, or get specific project by ID - `manage_project(action, project_id=None, title=None, description=None, github_repo=None)` - Actions: "create", "update", "delete" ### Document Tools - `list_documents(project_id, document_id=None, query=None, document_type=None, page=1, per_page=10)` - List project documents, search, filter by type, or get specific document - `manage_document(action, project_id, document_id=None, title=None, document_type=None, content=None, ...)` - Actions: "create", "update", "delete" ## ๐Ÿ” Research Patterns ### CRITICAL: Keep Queries Short and Focused! Vector search works best with 2-5 keywords, NOT long sentences or keyword dumps. โœ… GOOD Queries (concise, focused): - `rag_search_knowledge_base(query="vector search pgvector")` - `rag_search_code_examples(query="React useState")` - `rag_search_knowledge_base(query="authentication JWT")` - `rag_search_code_examples(query="FastAPI middleware")` โŒ BAD Queries (too long, unfocused): - `rag_search_knowledge_base(query="how to implement vector search with pgvector in PostgreSQL for semantic similarity matching with OpenAI embeddings")` - `rag_search_code_examples(query="React hooks useState useEffect useContext useReducer useMemo useCallback")` ### Query Construction Tips: - Extract 2-5 most important keywords from the user's request - Focus on technical terms and specific technologies - Omit filler words like "how to", "implement", "create", "example" - For multi-concept searches, do multiple focused queries instead of one broad query ## ๐Ÿ“Š Task Status Flow `todo` โ†’ `doing` โ†’ `review` โ†’ `done` - Only ONE task in 'doing' status at a time - Use 'review' for completed work awaiting validation - Mark tasks 'done' only after verification ## ๐Ÿ“ Task Granularity Guidelines ### Project Scope Determines Task Granularity **For Feature-Specific Projects** (project = single feature): Create granular implementation tasks: - "Set up development environment" - "Install required dependencies" - "Create database schema" - "Implement API endpoints" - "Add frontend components" - "Write unit tests" - "Add integration tests" - "Update documentation" **For Codebase-Wide Projects** (project = entire application): Create feature-level tasks: - "Implement user authentication feature" - "Add payment processing system" - "Create admin dashboard" """ # Initialize the main FastMCP server with fixed configuration try: logger.info("๐Ÿ—๏ธ MCP SERVER INITIALIZATION:") logger.info(" Server Name: archon-mcp-server") logger.info(" Description: MCP server using HTTP calls") mcp = FastMCP( "archon-mcp-server", description="MCP server for Archon - uses HTTP calls to other services", instructions=MCP_INSTRUCTIONS, lifespan=lifespan, host=server_host, port=server_port, ) logger.info("โœ“ FastMCP server instance created successfully") except Exception as e: logger.error(f"โœ— Failed to create FastMCP server: {e}") logger.error(traceback.format_exc()) raise # HTTP health check endpoint for K8s and frontend polling @mcp.custom_route("/health", methods=["GET"]) async def http_health_check(request: Request) -> JSONResponse: """ HTTP health check endpoint for K8s probes and frontend monitoring. This is a lightweight endpoint that returns 200 OK to indicate the server is running. For detailed health checks including dependent services, use the health_check tool. """ try: # Quick health check without heavy dependencies return JSONResponse({ "status": "healthy", "service": "mcp", "timestamp": datetime.now().isoformat(), }) except Exception as e: logger.error(f"HTTP health check failed: {e}") return JSONResponse( {"status": "unhealthy", "error": str(e)}, status_code=503 ) # MCP tool health check endpoint (detailed health status) @mcp.tool() async def health_check(ctx: Context) -> str: """ Check health status of MCP server and dependencies. Returns: JSON with health status, uptime, and service availability """ try: # Try to get the lifespan context context = getattr(ctx.request_context, "lifespan_context", None) if context is None: # Server starting up return json.dumps({ "success": True, "status": "starting", "message": "MCP server is initializing...", "timestamp": datetime.now().isoformat(), }) # Server is ready - perform health checks if hasattr(context, "health_status") and context.health_status: await perform_health_checks(context) return json.dumps({ "success": True, "health": context.health_status, "uptime_seconds": time.time() - context.startup_time, "timestamp": datetime.now().isoformat(), }) else: return json.dumps({ "success": True, "status": "ready", "message": "MCP server is running", "timestamp": datetime.now().isoformat(), }) except Exception as e: logger.error(f"Health check failed: {e}") return json.dumps({ "success": False, "error": f"Health check failed: {str(e)}", "timestamp": datetime.now().isoformat(), }) # HTTP /health endpoint for frontend health checks (not an MCP tool) @mcp.custom_route("/health", methods=["GET"]) async def http_health_endpoint(request) -> dict: """HTTP health endpoint for Kubernetes liveness/readiness probes and frontend monitoring.""" global _shared_context from starlette.responses import JSONResponse if _shared_context and hasattr(_shared_context, "health_status"): # Server is ready with health status health_data = { "status": _shared_context.health_status.get("status", "unknown"), "api_service": _shared_context.health_status.get("api_service", False), "agents_service": _shared_context.health_status.get("agents_service", False), "uptime_seconds": time.time() - _shared_context.startup_time, "timestamp": datetime.now().isoformat(), } status_code = 200 if health_data["status"] == "healthy" else 503 return JSONResponse(content=health_data, status_code=status_code) else: # Server starting up return JSONResponse( content={ "status": "starting", "message": "MCP server is initializing...", "timestamp": datetime.now().isoformat(), }, status_code=503 ) # Session management endpoint @mcp.tool() async def session_info(ctx: Context) -> str: """ Get current and active session information. Returns: JSON with active sessions count and server uptime """ try: session_manager = get_session_manager() # Build session info session_info_data = { "active_sessions": session_manager.get_active_session_count(), "session_timeout": session_manager.timeout, } # Add server uptime context = getattr(ctx.request_context, "lifespan_context", None) if context and hasattr(context, "startup_time"): session_info_data["server_uptime_seconds"] = time.time() - context.startup_time return json.dumps({ "success": True, "session_management": session_info_data, "timestamp": datetime.now().isoformat(), }) except Exception as e: logger.error(f"Session info failed: {e}") return json.dumps({ "success": False, "error": f"Failed to get session info: {str(e)}", "timestamp": datetime.now().isoformat(), }) # Import and register modules def register_modules(): """Register all MCP tool modules.""" logger.info("๐Ÿ”ง Registering MCP tool modules...") modules_registered = 0 # Import and register RAG module (HTTP-based version) try: from src.mcp_server.features.rag import register_rag_tools register_rag_tools(mcp) modules_registered += 1 logger.info("โœ“ RAG module registered (HTTP-based)") except ImportError as e: logger.warning(f"โš  RAG module not available: {e}") except Exception as e: logger.error(f"โœ— Error registering RAG module: {e}") logger.error(traceback.format_exc()) # Import and register all feature tools - separated and focused # Project Management Tools try: from src.mcp_server.features.projects import register_project_tools register_project_tools(mcp) modules_registered += 1 logger.info("โœ“ Project tools registered") except ImportError as e: # Module not found - this is acceptable in modular architecture logger.warning(f"โš  Project tools module not available (optional): {e}") except (SyntaxError, NameError, AttributeError) as e: # Code errors that should not be ignored logger.error(f"โœ— Code error in project tools - MUST FIX: {e}") logger.error(traceback.format_exc()) raise # Re-raise to prevent running with broken code except Exception as e: # Unexpected errors during registration logger.error(f"โœ— Failed to register project tools: {e}") logger.error(traceback.format_exc()) # Don't raise - allow other modules to register # Task Management Tools try: from src.mcp_server.features.tasks import register_task_tools register_task_tools(mcp) modules_registered += 1 logger.info("โœ“ Task tools registered") except ImportError as e: logger.warning(f"โš  Task tools module not available (optional): {e}") except (SyntaxError, NameError, AttributeError) as e: logger.error(f"โœ— Code error in task tools - MUST FIX: {e}") logger.error(traceback.format_exc()) raise except Exception as e: logger.error(f"โœ— Failed to register task tools: {e}") logger.error(traceback.format_exc()) # Document Management Tools try: from src.mcp_server.features.documents import register_document_tools register_document_tools(mcp) modules_registered += 1 logger.info("โœ“ Document tools registered") except ImportError as e: logger.warning(f"โš  Document tools module not available (optional): {e}") except (SyntaxError, NameError, AttributeError) as e: logger.error(f"โœ— Code error in document tools - MUST FIX: {e}") logger.error(traceback.format_exc()) raise except Exception as e: logger.error(f"โœ— Failed to register document tools: {e}") logger.error(traceback.format_exc()) # Version Management Tools try: from src.mcp_server.features.documents import register_version_tools register_version_tools(mcp) modules_registered += 1 logger.info("โœ“ Version tools registered") except ImportError as e: logger.warning(f"โš  Version tools module not available (optional): {e}") except (SyntaxError, NameError, AttributeError) as e: logger.error(f"โœ— Code error in version tools - MUST FIX: {e}") logger.error(traceback.format_exc()) raise except Exception as e: logger.error(f"โœ— Failed to register version tools: {e}") logger.error(traceback.format_exc()) # Feature Management Tools try: from src.mcp_server.features.feature_tools import register_feature_tools register_feature_tools(mcp) modules_registered += 1 logger.info("โœ“ Feature tools registered") except ImportError as e: logger.warning(f"โš  Feature tools module not available (optional): {e}") except (SyntaxError, NameError, AttributeError) as e: logger.error(f"โœ— Code error in feature tools - MUST FIX: {e}") logger.error(traceback.format_exc()) raise except Exception as e: logger.error(f"โœ— Failed to register feature tools: {e}") logger.error(traceback.format_exc()) logger.info(f"๐Ÿ“ฆ Total modules registered: {modules_registered}") if modules_registered == 0: logger.error("๐Ÿ’ฅ No modules were successfully registered!") raise RuntimeError("No MCP modules available") # Register all modules when this file is imported try: register_modules() except Exception as e: logger.error(f"๐Ÿ’ฅ Critical error during module registration: {e}") logger.error(traceback.format_exc()) raise def main(): """Main entry point for the MCP server.""" try: # Initialize Logfire first setup_logfire(service_name="archon-mcp-server") logger.info("๐Ÿš€ Starting Archon MCP Server") logger.info(" Mode: Streamable HTTP") logger.info(f" URL: http://{server_host}:{server_port}/mcp") mcp_logger.info("๐Ÿ”ฅ Logfire initialized for MCP server") mcp_logger.info(f"๐ŸŒŸ Starting MCP server - host={server_host}, port={server_port}") logger.info(f"โœ“ HTTP /health endpoint will be registered via @mcp.custom_route") mcp.run(transport="streamable-http") except Exception as e: mcp_logger.error(f"๐Ÿ’ฅ Fatal error in main - error={str(e)}, error_type={type(e).__name__}") logger.error(f"๐Ÿ’ฅ Fatal error in main: {e}") logger.error(traceback.format_exc()) raise if __name__ == "__main__": try: main() except KeyboardInterrupt: logger.info("๐Ÿ‘‹ MCP server stopped by user") except Exception as e: logger.error(f"๐Ÿ’ฅ Unhandled exception: {e}") logger.error(traceback.format_exc()) sys.exit(1)