from pydantic_ai.models.anthropic import AnthropicModel from pydantic_ai.models.openai import OpenAIModel from pydantic_ai import Agent, RunContext from langgraph.graph import StateGraph, START, END from langgraph.checkpoint.memory import MemorySaver from typing import TypedDict, Annotated, List, Any from langgraph.config import get_stream_writer from langgraph.types import interrupt from dotenv import load_dotenv from openai import AsyncOpenAI from supabase import Client import logfire import os import sys from utils.utils import get_env_var # Import the message classes from Pydantic AI from pydantic_ai.messages import ( ModelMessage, ModelMessagesTypeAdapter ) # Add the parent directory to Python path sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from archon.pydantic_ai_coder import pydantic_ai_coder, PydanticAIDeps, list_documentation_pages_helper # Load environment variables load_dotenv() # Configure logfire to suppress warnings (optional) logfire.configure(send_to_logfire='never') base_url = get_env_var('BASE_URL') or 'https://api.openai.com/v1' api_key = get_env_var('LLM_API_KEY') or 'no-llm-api-key-provided' is_ollama = "localhost" in base_url.lower() is_anthropic = "anthropic" in base_url.lower() is_openai = "openai" in base_url.lower() reasoner_llm_model_name = get_env_var('REASONER_MODEL') or 'o3-mini' reasoner_llm_model = AnthropicModel(reasoner_llm_model_name, api_key=api_key) if is_anthropic else OpenAIModel(reasoner_llm_model_name, base_url=base_url, api_key=api_key) reasoner = Agent( reasoner_llm_model, system_prompt='You are an expert at coding AI agents with Pydantic AI and defining the scope for doing so.', ) primary_llm_model_name = get_env_var('PRIMARY_MODEL') or 'gpt-4o-mini' primary_llm_model = AnthropicModel(primary_llm_model_name, api_key=api_key) if is_anthropic else OpenAIModel(primary_llm_model_name, base_url=base_url, api_key=api_key) router_agent = Agent( primary_llm_model, system_prompt='Your job is to route the user message either to the end of the conversation or to continue coding the AI agent.', ) end_conversation_agent = Agent( primary_llm_model, system_prompt='Your job is to end a conversation for creating an AI agent by giving instructions for how to execute the agent and they saying a nice goodbye to the user.', ) openai_client=None if is_ollama: openai_client = AsyncOpenAI(base_url=base_url,api_key=api_key) elif get_env_var("OPENAI_API_KEY"): openai_client = AsyncOpenAI(api_key=get_env_var("OPENAI_API_KEY")) else: openai_client = None if get_env_var("SUPABASE_URL"): supabase: Client = Client( get_env_var("SUPABASE_URL"), get_env_var("SUPABASE_SERVICE_KEY") ) else: supabase = None # Define state schema class AgentState(TypedDict): latest_user_message: str messages: Annotated[List[bytes], lambda x, y: x + y] scope: str # Scope Definition Node with Reasoner LLM async def define_scope_with_reasoner(state: AgentState): # First, get the documentation pages so the reasoner can decide which ones are necessary documentation_pages = await list_documentation_pages_helper(supabase) documentation_pages_str = "\n".join(documentation_pages) # Then, use the reasoner to define the scope prompt = f""" User AI Agent Request: {state['latest_user_message']} Create detailed scope document for the AI agent including: - Architecture diagram - Core components - External dependencies - Testing strategy Also based on these documentation pages available: {documentation_pages_str} Include a list of documentation pages that are relevant to creating this agent for the user in the scope document. """ result = await reasoner.run(prompt) scope = result.data # Get the directory one level up from the current file current_dir = os.path.dirname(os.path.abspath(__file__)) parent_dir = os.path.dirname(current_dir) scope_path = os.path.join(parent_dir, "workbench", "scope.md") os.makedirs(os.path.join(parent_dir, "workbench"), exist_ok=True) with open(scope_path, "w", encoding="utf-8") as f: f.write(scope) return {"scope": scope} # Coding Node with Feedback Handling async def coder_agent(state: AgentState, writer): # Prepare dependencies deps = PydanticAIDeps( supabase=supabase, openai_client=openai_client, reasoner_output=state['scope'] ) # Get the message history into the format for Pydantic AI message_history: list[ModelMessage] = [] for message_row in state['messages']: message_history.extend(ModelMessagesTypeAdapter.validate_json(message_row)) # Run the agent in a stream if not is_openai: writer = get_stream_writer() result = await pydantic_ai_coder.run(state['latest_user_message'], deps=deps, message_history= message_history) writer(result.data) else: async with pydantic_ai_coder.run_stream( state['latest_user_message'], deps=deps, message_history= message_history ) as result: # Stream partial text as it arrives async for chunk in result.stream_text(delta=True): writer(chunk) # print(ModelMessagesTypeAdapter.validate_json(result.new_messages_json())) return {"messages": [result.new_messages_json()]} # Interrupt the graph to get the user's next message def get_next_user_message(state: AgentState): value = interrupt({}) # Set the user's latest message for the LLM to continue the conversation return { "latest_user_message": value } # Determine if the user is finished creating their AI agent or not async def route_user_message(state: AgentState): prompt = f""" The user has sent a message: {state['latest_user_message']} If the user wants to end the conversation, respond with just the text "finish_conversation". If the user wants to continue coding the AI agent, respond with just the text "coder_agent". """ result = await router_agent.run(prompt) next_action = result.data if next_action == "finish_conversation": return "finish_conversation" else: return "coder_agent" # End of conversation agent to give instructions for executing the agent async def finish_conversation(state: AgentState, writer): # Get the message history into the format for Pydantic AI message_history: list[ModelMessage] = [] for message_row in state['messages']: message_history.extend(ModelMessagesTypeAdapter.validate_json(message_row)) # Run the agent in a stream if not is_openai: writer = get_stream_writer() result = await end_conversation_agent.run(state['latest_user_message'], message_history= message_history) writer(result.data) else: async with end_conversation_agent.run_stream( state['latest_user_message'], message_history= message_history ) as result: # Stream partial text as it arrives async for chunk in result.stream_text(delta=True): writer(chunk) return {"messages": [result.new_messages_json()]} # Build workflow builder = StateGraph(AgentState) # Add nodes builder.add_node("define_scope_with_reasoner", define_scope_with_reasoner) builder.add_node("coder_agent", coder_agent) builder.add_node("get_next_user_message", get_next_user_message) builder.add_node("finish_conversation", finish_conversation) # Set edges builder.add_edge(START, "define_scope_with_reasoner") builder.add_edge("define_scope_with_reasoner", "coder_agent") builder.add_edge("coder_agent", "get_next_user_message") builder.add_conditional_edges( "get_next_user_message", route_user_message, {"coder_agent": "coder_agent", "finish_conversation": "finish_conversation"} ) builder.add_edge("finish_conversation", END) # Configure persistence memory = MemorySaver() agentic_flow = builder.compile(checkpointer=memory)