Add task management tools with smart routing

Extract task functionality into focused tools:
- create_task: Create tasks with sources and code examples
- list_tasks: List tasks with project/status filtering
- get_task: Retrieve task details
- update_task: Modify task properties
- delete_task: Archive tasks (soft delete)

Preserves intelligent endpoint routing:
- Project-specific: /api/projects/{id}/tasks
- Status filtering: /api/tasks?status=X
- Assignee filtering: /api/tasks?assignee=X
This commit is contained in:
Rasmus Widing 2025-08-18 20:42:04 +03:00
parent 4f317d9ff5
commit f786a8026b
2 changed files with 389 additions and 0 deletions

View File

@ -0,0 +1,14 @@
"""
Task management tools for Archon MCP Server.
This module provides separate tools for each task operation:
- create_task: Create a new task
- list_tasks: List tasks with filtering
- get_task: Get task details
- update_task: Update task properties
- delete_task: Delete a task
"""
from .task_tools import register_task_tools
__all__ = ["register_task_tools"]

View File

@ -0,0 +1,375 @@
"""
Simple task management tools for Archon MCP Server.
Provides separate, focused tools for each task operation.
Mirrors the functionality of the original manage_task tool but with individual tools.
"""
import json
import logging
from typing import Any, Optional, List, Dict
from urllib.parse import urljoin
import httpx
from mcp.server.fastmcp import Context, FastMCP
from src.server.config.service_discovery import get_api_url
logger = logging.getLogger(__name__)
def register_task_tools(mcp: FastMCP):
"""Register individual task management tools with the MCP server."""
@mcp.tool()
async def create_task(
ctx: Context,
project_id: str,
title: str,
description: str = "",
assignee: str = "User",
task_order: int = 0,
feature: Optional[str] = None,
sources: Optional[List[Dict[str, str]]] = None,
code_examples: Optional[List[Dict[str, str]]] = None,
) -> str:
"""
Create a new task in a project.
Args:
project_id: Project UUID (required)
title: Task title - should be specific and actionable (required)
description: Detailed task description with acceptance criteria
assignee: Who will work on this task. Options:
- "User": For manual tasks
- "Archon": For AI-driven tasks
- "AI IDE Agent": For code implementation
- "prp-executor": For PRP coordination
- "prp-validator": For testing/validation
task_order: Priority within status (0-100, higher = more priority)
feature: Feature label for grouping related tasks (e.g., "authentication")
sources: List of source references. Each source should have:
- "url": Link to documentation or file path
- "type": Type of source (e.g., "documentation", "api_spec")
- "relevance": Why this source is relevant
code_examples: List of code examples. Each example should have:
- "file": Path to the file
- "function": Function or class name
- "purpose": Why this example is relevant
Returns:
JSON with task details including task_id:
{
"success": true,
"task": {...},
"task_id": "task-123",
"message": "Task created successfully"
}
Examples:
# Simple task
create_task(
project_id="550e8400-e29b-41d4-a716-446655440000",
title="Add user authentication",
description="Implement JWT-based authentication with refresh tokens"
)
# Task with sources and examples
create_task(
project_id="550e8400-e29b-41d4-a716-446655440000",
title="Implement OAuth2 Google provider",
description="Add Google OAuth2 with PKCE security",
assignee="AI IDE Agent",
task_order=10,
feature="authentication",
sources=[
{
"url": "https://developers.google.com/identity/protocols/oauth2",
"type": "documentation",
"relevance": "Official OAuth2 implementation guide"
},
{
"url": "docs/auth/README.md",
"type": "internal_docs",
"relevance": "Current auth architecture"
}
],
code_examples=[
{
"file": "src/auth/base.py",
"function": "BaseAuthProvider",
"purpose": "Base class to extend"
},
{
"file": "tests/auth/test_oauth.py",
"function": "test_oauth_flow",
"purpose": "Test pattern to follow"
}
]
)
"""
try:
api_url = get_api_url()
timeout = httpx.Timeout(30.0, connect=5.0)
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.post(
urljoin(api_url, "/api/tasks"),
json={
"project_id": project_id,
"title": title,
"description": description,
"assignee": assignee,
"task_order": task_order,
"feature": feature,
"sources": sources or [],
"code_examples": code_examples or [],
},
)
if response.status_code == 200:
result = response.json()
return json.dumps({
"success": True,
"task": result.get("task"),
"task_id": result.get("task", {}).get("id"),
"message": result.get("message", "Task created successfully"),
})
else:
error_detail = response.text
return json.dumps({"success": False, "error": error_detail})
except Exception as e:
logger.error(f"Error creating task: {e}")
return json.dumps({"success": False, "error": str(e)})
@mcp.tool()
async def list_tasks(
ctx: Context,
filter_by: Optional[str] = None,
filter_value: Optional[str] = None,
project_id: Optional[str] = None,
include_closed: bool = False,
page: int = 1,
per_page: int = 50,
) -> str:
"""
List tasks with filtering options.
Args:
filter_by: "status" | "project" | "assignee" (optional)
filter_value: Filter value (e.g., "todo", "doing", "review", "done")
project_id: Project UUID (optional, for additional filtering)
include_closed: Include done tasks in results
page: Page number for pagination
per_page: Items per page
Returns:
JSON array of tasks with pagination info
Examples:
list_tasks() # All tasks
list_tasks(filter_by="status", filter_value="todo") # Only todo tasks
list_tasks(filter_by="project", filter_value="project-uuid") # Tasks for specific project
"""
try:
api_url = get_api_url()
timeout = httpx.Timeout(30.0, connect=5.0)
# Build URL and parameters based on filter type
params = {
"page": page,
"per_page": per_page,
"exclude_large_fields": True, # Always exclude large fields in MCP responses
}
if filter_by == "project" and filter_value:
# Use project-specific endpoint for project filtering
url = urljoin(api_url, f"/api/projects/{filter_value}/tasks")
params["include_archived"] = False # For backward compatibility
elif filter_by == "status" and filter_value:
# Use generic tasks endpoint for status filtering
url = urljoin(api_url, "/api/tasks")
params["status"] = filter_value
params["include_closed"] = include_closed
if project_id:
params["project_id"] = project_id
else:
# Default to generic tasks endpoint
url = urljoin(api_url, "/api/tasks")
params["include_closed"] = include_closed
if project_id:
params["project_id"] = project_id
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.get(url, params=params)
response.raise_for_status()
result = response.json()
# Handle both direct array and paginated response formats
if isinstance(result, list):
tasks = result
pagination_info = None
else:
if "tasks" in result:
tasks = result.get("tasks", [])
pagination_info = result.get("pagination", {})
else:
tasks = result if isinstance(result, list) else []
pagination_info = None
return json.dumps({
"success": True,
"tasks": tasks,
"pagination": pagination_info,
"total_count": len(tasks)
if pagination_info is None
else pagination_info.get("total", len(tasks)),
"count": len(tasks),
})
except Exception as e:
logger.error(f"Error listing tasks: {e}")
return json.dumps({"success": False, "error": str(e)})
@mcp.tool()
async def get_task(ctx: Context, task_id: str) -> str:
"""
Get detailed information about a specific task.
Args:
task_id: UUID of the task
Returns:
JSON with complete task details
Example:
get_task(task_id="task-uuid")
"""
try:
api_url = get_api_url()
timeout = httpx.Timeout(30.0, connect=5.0)
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.get(urljoin(api_url, f"/api/tasks/{task_id}"))
if response.status_code == 200:
task = response.json()
return json.dumps({"success": True, "task": task})
elif response.status_code == 404:
return json.dumps({"success": False, "error": f"Task {task_id} not found"})
else:
return json.dumps({"success": False, "error": "Failed to get task"})
except Exception as e:
logger.error(f"Error getting task: {e}")
return json.dumps({"success": False, "error": str(e)})
@mcp.tool()
async def update_task(
ctx: Context,
task_id: str,
update_fields: Dict[str, Any],
) -> str:
"""
Update a task's properties.
Args:
task_id: UUID of the task to update
update_fields: Dict of fields to update (e.g., {"status": "doing", "assignee": "AI IDE Agent"})
Returns:
JSON with updated task details
Examples:
update_task(task_id="uuid", update_fields={"status": "doing"})
update_task(task_id="uuid", update_fields={"title": "New Title", "description": "Updated description"})
"""
try:
api_url = get_api_url()
timeout = httpx.Timeout(30.0, connect=5.0)
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.put(
urljoin(api_url, f"/api/tasks/{task_id}"), json=update_fields
)
if response.status_code == 200:
result = response.json()
return json.dumps({
"success": True,
"task": result.get("task"),
"message": result.get("message", "Task updated successfully"),
})
else:
error_detail = response.text
return json.dumps({"success": False, "error": error_detail})
except Exception as e:
logger.error(f"Error updating task: {e}")
return json.dumps({"success": False, "error": str(e)})
@mcp.tool()
async def delete_task(ctx: Context, task_id: str) -> str:
"""
Delete/archive a task.
This removes the task from active lists but preserves it in the database
for audit purposes (soft delete).
Args:
task_id: UUID of the task to delete/archive
Returns:
JSON confirmation of deletion:
{
"success": true,
"message": "Task deleted successfully",
"subtasks_archived": 0
}
Example:
delete_task(task_id="task-123e4567-e89b-12d3-a456-426614174000")
"""
try:
api_url = get_api_url()
timeout = httpx.Timeout(30.0, connect=5.0)
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.delete(urljoin(api_url, f"/api/tasks/{task_id}"))
if response.status_code == 200:
result = response.json()
return json.dumps({
"success": True,
"message": result.get("message", f"Task {task_id} deleted successfully"),
"subtasks_archived": result.get("subtasks_archived", 0),
})
elif response.status_code == 404:
return json.dumps({
"success": False,
"error": f"Task {task_id} not found. Use list_tasks to find valid task IDs."
})
elif response.status_code == 400:
# More specific error for bad requests
error_text = response.text
if "already archived" in error_text.lower():
return json.dumps({
"success": False,
"error": f"Task {task_id} is already archived. No further action needed."
})
return json.dumps({
"success": False,
"error": f"Cannot delete task: {error_text}"
})
else:
return json.dumps({
"success": False,
"error": f"Failed to delete task (HTTP {response.status_code}): {response.text}"
})
except Exception as e:
logger.error(f"Error deleting task: {e}")
return json.dumps({"success": False, "error": str(e)})