diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..56b6e10 --- /dev/null +++ b/.env.example @@ -0,0 +1,33 @@ +# Base URL for the OpenAI instance (default is https://api.openai.com/v1) +# OpenAI: https://api.openai.com/v1 +# Ollama (example): http://localhost:11434/v1 +# OpenRouter: https://openrouter.ai/api/v1 +BASE_URL= + +# Get your Open AI API Key by following these instructions - +# https://help.openai.com/en/articles/4936850-where-do-i-find-my-openai-api-key +# Even if using OpenRouter/Ollama, you still need to set this for the embedding model. +# Future versions of Archon will be more flexible with this. +OPENAI_API_KEY= + +# For OpenAI: https://help.openai.com/en/articles/4936850-where-do-i-find-my-openai-api-key +# For OpenRouter: https://openrouter.ai/keys +LLM_API_KEY= + +# For the Supabase version (sample_supabase_agent.py), set your Supabase URL and Service Key. +# Get your SUPABASE_URL from the API section of your Supabase project settings - +# https://supabase.com/dashboard/project//settings/api +SUPABASE_URL= + +# Get your SUPABASE_SERVICE_KEY from the API section of your Supabase project settings - +# https://supabase.com/dashboard/project//settings/api +# On this page it is called the service_role secret. +SUPABASE_SERVICE_KEY= + +# The LLM you want to use for the reasoner (o3-mini, R1, QwQ, etc.). +# Example: o3-mini +REASONER_MODEL= + +# The LLM you want to use for the primary agent/coder. +# Example: gpt-4o-mini +PRIMARY_MODEL= \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9423fd0 --- /dev/null +++ b/.gitignore @@ -0,0 +1,8 @@ +# Folders +workbench +__pycache__ +venv +.langgraph_api + +# Files +.env \ No newline at end of file diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..3bd1e8b --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2025 oTTomator and Archon contributors + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..2c79e48 --- /dev/null +++ b/README.md @@ -0,0 +1,150 @@ +# Archon - AI Agent Builder + +Archon Logo + +
+ +

🚀 **CURRENT VERSION** 🚀

+ +**[ V2 - Agentic Workflow ]** +*Using LangGraph + Pydantic AI for multi-agent orchestration and planning* + +
+ +Archon is an AI meta-agent designed to autonomously build, refine, and optimize other AI agents. + +It serves both as a practical tool for developers and as an educational framework demonstrating the evolution of agentic systems. +Archon will be developed in iterations, starting with just a simple Pydantic AI agent that can build other Pydantic AI agents, +all the way to a full agentic workflow using LangGraph that can build other AI agents with any framework. +Through its iterative development, Archon showcases the power of planning, feedback loops, and domain-specific knowledge in creating robust AI agents. + +The current version of Archon is V2 as mentioned above - see [V2 Documentation](iterations/v2-agentic-workflow/README.md) for details. + +## Vision + +Archon demonstrates three key principles in modern AI development: + +1. **Agentic Reasoning**: Planning, iterative feedback, and self-evaluation overcome the limitations of purely reactive systems +2. **Domain Knowledge Integration**: Seamless embedding of frameworks like Pydantic AI and LangGraph within autonomous workflows +3. **Scalable Architecture**: Modular design supporting maintainability, cost optimization, and ethical AI practices + +## Project Evolution + +### V1: Single-Agent Foundation +- Basic RAG-powered agent using Pydantic AI +- Supabase vector database for documentation storage +- Simple code generation without validation +- [Learn more about V1](iterations/v1-single-agent/README.md) + +### V2: Current - Agentic Workflow (LangGraph) +- Multi-agent system with planning and execution separation +- Reasoning LLM (O3-mini/R1) for architecture planning +- LangGraph for workflow orchestration +- Support for local LLMs via Ollama +- [Learn more about V2](iterations/v2-agentic-workflow/README.md) + +### Future Iterations +- V3: Self-Feedback Loop - Automated validation and error correction +- V4: Tool Library Integration - Pre-built external tool incorporation +- V5: Multi-Framework Support - Framework-agnostic agent generation +- V6: Autonomous Framework Learning - Self-updating framework adapters + +### Future Integrations +- Docker +- LangSmith +- MCP +- Other frameworks besides Pydantic AI +- Other vector databases besides Supabase + +## Getting Started with V2 (current version) + +Since V2 is the current version of Archon, all the code for V2 is in both the `archon` and `archon/iterations/v2-agentic-workflow` directories. + +### Prerequisites +- Python 3.11+ +- Supabase account and database +- OpenAI/OpenRouter API key or Ollama for local LLMs +- Streamlit (for web interface) + +### Installation + +1. Clone the repository: +```bash +git clone https://github.com/coleam00/archon.git +cd archon +``` + +2. Install dependencies: +```bash +python -m venv venv +source venv/bin/activate # On Windows: venv\Scripts\activate +pip install -r requirements.txt +``` + +3. Configure environment: + - Rename `.env.example` to `.env` + - Edit `.env` with your settings: + ```env + BASE_URL=https://api.openai.com/v1 for OpenAI, https://api.openrouter.ai/v1 for OpenRouter, or your Ollama URL + LLM_API_KEY=your_openai_or_openrouter_api_key + OPENAI_API_KEY=your_openai_api_key # Required for embeddings + SUPABASE_URL=your_supabase_url + SUPABASE_SERVICE_KEY=your_supabase_service_key + PRIMARY_MODEL=gpt-4o-mini # Main agent model + REASONER_MODEL=o3-mini # Planning model + ``` + +### Quick Start + +1. Set up the database: + - Execute `site_pages.sql` in your Supabase SQL Editor + - This creates tables and enables vector similarity search + +2. Crawl documentation: +```bash +python crawl_pydantic_ai_docs.py +``` + +3. Launch the UI: +```bash +streamlit run streamlit_ui.py +``` + +Visit `http://localhost:8501` to start building AI agents! + +## Architecture + +### Current V2 Components +- `archon_graph.py`: LangGraph workflow and agent coordination +- `pydantic_ai_coder.py`: Main coding agent with RAG capabilities +- `crawl_pydantic_ai_docs.py`: Documentation processor +- `streamlit_ui.py`: Interactive web interface +- `site_pages.sql`: Database schema + +### Database Schema +```sql +CREATE TABLE site_pages ( + id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), + url TEXT, + chunk_number INTEGER, + title TEXT, + summary TEXT, + content TEXT, + metadata JSONB, + embedding VECTOR(1536) +); +``` + +## Contributing + +We welcome contributions! Whether you're fixing bugs, adding features, or improving documentation, please feel free to submit a Pull Request. + +## License + +[MIT License](LICENSE) + +--- + +For version-specific details: +- [V1 Documentation](iterations/v1-single-agent/README.md) +- [V2 Documentation](iterations/v2-agentic-workflow/README.md) diff --git a/archon_graph.py b/archon_graph.py new file mode 100644 index 0000000..42dffc4 --- /dev/null +++ b/archon_graph.py @@ -0,0 +1,201 @@ +from pydantic_ai.models.openai import OpenAIModel +from pydantic_ai import Agent, RunContext +from langgraph.graph import StateGraph, START, END +from langgraph.checkpoint.memory import MemorySaver +from typing import TypedDict, Annotated, List, Any +from langgraph.config import get_stream_writer +from langgraph.types import interrupt +from dotenv import load_dotenv +from openai import AsyncOpenAI +from supabase import Client +import logfire +import os + +# Import the message classes from Pydantic AI +from pydantic_ai.messages import ( + ModelMessage, + ModelMessagesTypeAdapter +) + +from pydantic_ai_coder import pydantic_ai_coder, PydanticAIDeps, list_documentation_pages_helper + +# Load environment variables +load_dotenv() + +# Configure logfire to suppress warnings (optional) +logfire.configure(send_to_logfire='never') + +base_url = os.getenv('BASE_URL', 'https://api.openai.com/v1') +api_key = os.getenv('LLM_API_KEY', 'no-llm-api-key-provided') +is_ollama = "localhost" in base_url.lower() +reasoner_llm_model = os.getenv('REASONER_MODEL', 'o3-mini') +reasoner = Agent( + OpenAIModel(reasoner_llm_model, base_url=base_url, api_key=api_key), + system_prompt='You are an expert at coding AI agents with Pydantic AI and defining the scope for doing so.', +) + +primary_llm_model = os.getenv('PRIMARY_MODEL', 'gpt-4o-mini') +router_agent = Agent( + OpenAIModel(primary_llm_model, base_url=base_url, api_key=api_key), + system_prompt='Your job is to route the user message either to the end of the conversation or to continue coding the AI agent.', +) + +end_conversation_agent = Agent( + OpenAIModel(primary_llm_model, base_url=base_url, api_key=api_key), + system_prompt='Your job is to end a conversation for creating an AI agent by giving instructions for how to execute the agent and they saying a nice goodbye to the user.', +) + +openai_client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY")) +supabase: Client = Client( + os.getenv("SUPABASE_URL"), + os.getenv("SUPABASE_SERVICE_KEY") +) + +# Define state schema +class AgentState(TypedDict): + latest_user_message: str + messages: Annotated[List[bytes], lambda x, y: x + y] + scope: str + +# Scope Definition Node with Reasoner LLM +async def define_scope_with_reasoner(state: AgentState): + # First, get the documentation pages so the reasoner can decide which ones are necessary + documentation_pages = await list_documentation_pages_helper(supabase) + documentation_pages_str = "\n".join(documentation_pages) + + # Then, use the reasoner to define the scope + prompt = f""" + User AI Agent Request: {state['latest_user_message']} + + Create detailed scope document for the AI agent including: + - Architecture diagram + - Core components + - External dependencies + - Testing strategy + + Also based on these documentation pages available: + + {documentation_pages_str} + + Include a list of documentation pages that are relevant to creating this agent for the user in the scope document. + """ + + result = await reasoner.run(prompt) + scope = result.data + + # Save the scope to a file + scope_path = os.path.join("workbench", "scope.md") + os.makedirs("workbench", exist_ok=True) + + with open(scope_path, "w", encoding="utf-8") as f: + f.write(scope) + + return {"scope": scope} + +# Coding Node with Feedback Handling +async def coder_agent(state: AgentState, writer): + # Prepare dependencies + deps = PydanticAIDeps( + supabase=supabase, + openai_client=openai_client, + reasoner_output=state['scope'] + ) + + # Get the message history into the format for Pydantic AI + message_history: list[ModelMessage] = [] + for message_row in state['messages']: + message_history.extend(ModelMessagesTypeAdapter.validate_json(message_row)) + + # Run the agent in a stream + if is_ollama: + writer = get_stream_writer() + result = await pydantic_ai_coder.run(state['latest_user_message'], deps=deps, message_history= message_history) + writer(result.data) + else: + async with pydantic_ai_coder.run_stream( + state['latest_user_message'], + deps=deps, + message_history= message_history + ) as result: + # Stream partial text as it arrives + async for chunk in result.stream_text(delta=True): + writer(chunk) + + # print(ModelMessagesTypeAdapter.validate_json(result.new_messages_json())) + + return {"messages": [result.new_messages_json()]} + +# Interrupt the graph to get the user's next message +def get_next_user_message(state: AgentState): + value = interrupt({}) + + # Set the user's latest message for the LLM to continue the conversation + return { + "latest_user_message": value + } + +# Determine if the user is finished creating their AI agent or not +async def route_user_message(state: AgentState): + prompt = f""" + The user has sent a message: + + {state['latest_user_message']} + + If the user wants to end the conversation, respond with just the text "finish_conversation". + If the user wants to continue coding the AI agent, respond with just the text "coder_agent". + """ + + result = await router_agent.run(prompt) + next_action = result.data + + if next_action == "finish_conversation": + return "finish_conversation" + else: + return "coder_agent" + +# End of conversation agent to give instructions for executing the agent +async def finish_conversation(state: AgentState, writer): + # Get the message history into the format for Pydantic AI + message_history: list[ModelMessage] = [] + for message_row in state['messages']: + message_history.extend(ModelMessagesTypeAdapter.validate_json(message_row)) + + # Run the agent in a stream + if is_ollama: + writer = get_stream_writer() + result = await end_conversation_agent.run(state['latest_user_message'], message_history= message_history) + writer(result.data) + else: + async with end_conversation_agent.run_stream( + state['latest_user_message'], + message_history= message_history + ) as result: + # Stream partial text as it arrives + async for chunk in result.stream_text(delta=True): + writer(chunk) + + return {"messages": [result.new_messages_json()]} + +# Build workflow +builder = StateGraph(AgentState) + +# Add nodes +builder.add_node("define_scope_with_reasoner", define_scope_with_reasoner) +builder.add_node("coder_agent", coder_agent) +builder.add_node("get_next_user_message", get_next_user_message) +builder.add_node("finish_conversation", finish_conversation) + +# Set edges +builder.add_edge(START, "define_scope_with_reasoner") +builder.add_edge("define_scope_with_reasoner", "coder_agent") +builder.add_edge("coder_agent", "get_next_user_message") +builder.add_conditional_edges( + "get_next_user_message", + route_user_message, + {"coder_agent": "coder_agent", "finish_conversation": "finish_conversation"} +) +builder.add_edge("finish_conversation", END) + +# Configure persistence +memory = MemorySaver() +agentic_flow = builder.compile(checkpointer=memory) \ No newline at end of file diff --git a/crawl_pydantic_ai_docs.py b/crawl_pydantic_ai_docs.py new file mode 100644 index 0000000..81e897f --- /dev/null +++ b/crawl_pydantic_ai_docs.py @@ -0,0 +1,245 @@ +import os +import sys +import json +import asyncio +import requests +from xml.etree import ElementTree +from typing import List, Dict, Any +from dataclasses import dataclass +from datetime import datetime, timezone +from urllib.parse import urlparse +from dotenv import load_dotenv + +from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode +from openai import AsyncOpenAI +from supabase import create_client, Client + +load_dotenv() + +# Initialize OpenAI and Supabase clients +openai_client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY")) +supabase: Client = create_client( + os.getenv("SUPABASE_URL"), + os.getenv("SUPABASE_SERVICE_KEY") +) + +@dataclass +class ProcessedChunk: + url: str + chunk_number: int + title: str + summary: str + content: str + metadata: Dict[str, Any] + embedding: List[float] + +def chunk_text(text: str, chunk_size: int = 5000) -> List[str]: + """Split text into chunks, respecting code blocks and paragraphs.""" + chunks = [] + start = 0 + text_length = len(text) + + while start < text_length: + # Calculate end position + end = start + chunk_size + + # If we're at the end of the text, just take what's left + if end >= text_length: + chunks.append(text[start:].strip()) + break + + # Try to find a code block boundary first (```) + chunk = text[start:end] + code_block = chunk.rfind('```') + if code_block != -1 and code_block > chunk_size * 0.3: + end = start + code_block + + # If no code block, try to break at a paragraph + elif '\n\n' in chunk: + # Find the last paragraph break + last_break = chunk.rfind('\n\n') + if last_break > chunk_size * 0.3: # Only break if we're past 30% of chunk_size + end = start + last_break + + # If no paragraph break, try to break at a sentence + elif '. ' in chunk: + # Find the last sentence break + last_period = chunk.rfind('. ') + if last_period > chunk_size * 0.3: # Only break if we're past 30% of chunk_size + end = start + last_period + 1 + + # Extract chunk and clean it up + chunk = text[start:end].strip() + if chunk: + chunks.append(chunk) + + # Move start position for next chunk + start = max(start + 1, end) + + return chunks + +async def get_title_and_summary(chunk: str, url: str) -> Dict[str, str]: + """Extract title and summary using GPT-4.""" + system_prompt = """You are an AI that extracts titles and summaries from documentation chunks. + Return a JSON object with 'title' and 'summary' keys. + For the title: If this seems like the start of a document, extract its title. If it's a middle chunk, derive a descriptive title. + For the summary: Create a concise summary of the main points in this chunk. + Keep both title and summary concise but informative.""" + + try: + response = await openai_client.chat.completions.create( + model=os.getenv("LLM_MODEL", "gpt-4o-mini"), + messages=[ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": f"URL: {url}\n\nContent:\n{chunk[:1000]}..."} # Send first 1000 chars for context + ], + response_format={ "type": "json_object" } + ) + return json.loads(response.choices[0].message.content) + except Exception as e: + print(f"Error getting title and summary: {e}") + return {"title": "Error processing title", "summary": "Error processing summary"} + +async def get_embedding(text: str) -> List[float]: + """Get embedding vector from OpenAI.""" + try: + response = await openai_client.embeddings.create( + model="text-embedding-3-small", + input=text + ) + return response.data[0].embedding + except Exception as e: + print(f"Error getting embedding: {e}") + return [0] * 1536 # Return zero vector on error + +async def process_chunk(chunk: str, chunk_number: int, url: str) -> ProcessedChunk: + """Process a single chunk of text.""" + # Get title and summary + extracted = await get_title_and_summary(chunk, url) + + # Get embedding + embedding = await get_embedding(chunk) + + # Create metadata + metadata = { + "source": "pydantic_ai_docs", + "chunk_size": len(chunk), + "crawled_at": datetime.now(timezone.utc).isoformat(), + "url_path": urlparse(url).path + } + + return ProcessedChunk( + url=url, + chunk_number=chunk_number, + title=extracted['title'], + summary=extracted['summary'], + content=chunk, # Store the original chunk content + metadata=metadata, + embedding=embedding + ) + +async def insert_chunk(chunk: ProcessedChunk): + """Insert a processed chunk into Supabase.""" + try: + data = { + "url": chunk.url, + "chunk_number": chunk.chunk_number, + "title": chunk.title, + "summary": chunk.summary, + "content": chunk.content, + "metadata": chunk.metadata, + "embedding": chunk.embedding + } + + result = supabase.table("site_pages").insert(data).execute() + print(f"Inserted chunk {chunk.chunk_number} for {chunk.url}") + return result + except Exception as e: + print(f"Error inserting chunk: {e}") + return None + +async def process_and_store_document(url: str, markdown: str): + """Process a document and store its chunks in parallel.""" + # Split into chunks + chunks = chunk_text(markdown) + + # Process chunks in parallel + tasks = [ + process_chunk(chunk, i, url) + for i, chunk in enumerate(chunks) + ] + processed_chunks = await asyncio.gather(*tasks) + + # Store chunks in parallel + insert_tasks = [ + insert_chunk(chunk) + for chunk in processed_chunks + ] + await asyncio.gather(*insert_tasks) + +async def crawl_parallel(urls: List[str], max_concurrent: int = 5): + """Crawl multiple URLs in parallel with a concurrency limit.""" + browser_config = BrowserConfig( + headless=True, + verbose=False, + extra_args=["--disable-gpu", "--disable-dev-shm-usage", "--no-sandbox"], + ) + crawl_config = CrawlerRunConfig(cache_mode=CacheMode.BYPASS) + + # Create the crawler instance + crawler = AsyncWebCrawler(config=browser_config) + await crawler.start() + + try: + # Create a semaphore to limit concurrency + semaphore = asyncio.Semaphore(max_concurrent) + + async def process_url(url: str): + async with semaphore: + result = await crawler.arun( + url=url, + config=crawl_config, + session_id="session1" + ) + if result.success: + print(f"Successfully crawled: {url}") + await process_and_store_document(url, result.markdown_v2.raw_markdown) + else: + print(f"Failed: {url} - Error: {result.error_message}") + + # Process all URLs in parallel with limited concurrency + await asyncio.gather(*[process_url(url) for url in urls]) + finally: + await crawler.close() + +def get_pydantic_ai_docs_urls() -> List[str]: + """Get URLs from Pydantic AI docs sitemap.""" + sitemap_url = "https://ai.pydantic.dev/sitemap.xml" + try: + response = requests.get(sitemap_url) + response.raise_for_status() + + # Parse the XML + root = ElementTree.fromstring(response.content) + + # Extract all URLs from the sitemap + namespace = {'ns': 'http://www.sitemaps.org/schemas/sitemap/0.9'} + urls = [loc.text for loc in root.findall('.//ns:loc', namespace)] + + return urls + except Exception as e: + print(f"Error fetching sitemap: {e}") + return [] + +async def main(): + # Get URLs from Pydantic AI docs + urls = get_pydantic_ai_docs_urls() + if not urls: + print("No URLs found to crawl") + return + + print(f"Found {len(urls)} URLs to crawl") + await crawl_parallel(urls) + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/iterations/v1-single-agent/.env.example b/iterations/v1-single-agent/.env.example new file mode 100644 index 0000000..8b89f4b --- /dev/null +++ b/iterations/v1-single-agent/.env.example @@ -0,0 +1,19 @@ +# Get your Open AI API Key by following these instructions - +# https://help.openai.com/en/articles/4936850-where-do-i-find-my-openai-api-key +# You only need this environment variable set if you are using GPT (and not Ollama) +OPENAI_API_KEY= + +# For the Supabase version (sample_supabase_agent.py), set your Supabase URL and Service Key. +# Get your SUPABASE_URL from the API section of your Supabase project settings - +# https://supabase.com/dashboard/project//settings/api +SUPABASE_URL= + +# Get your SUPABASE_SERVICE_KEY from the API section of your Supabase project settings - +# https://supabase.com/dashboard/project//settings/api +# On this page it is called the service_role secret. +SUPABASE_SERVICE_KEY= + +# The LLM you want to use from OpenAI. See the list of models here: +# https://platform.openai.com/docs/models +# Example: gpt-4o-mini +LLM_MODEL= \ No newline at end of file diff --git a/iterations/v1-single-agent/README.md b/iterations/v1-single-agent/README.md new file mode 100644 index 0000000..163f123 --- /dev/null +++ b/iterations/v1-single-agent/README.md @@ -0,0 +1,122 @@ +# Archon V1 - Basic Pydantic AI Agent to Build other Pydantic AI Agents + +This is the first iteration of the Archon project - no use of LangGraph and built with a single AI agent to keep things very simple and introductory. + +An intelligent documentation crawler and RAG (Retrieval-Augmented Generation) agent built using Pydantic AI and Supabase that is capable of building other Pydantic AI agents. The agent crawls the Pydantic AI documentation, stores content in a vector database, and provides Pydantic AI agent code by retrieving and analyzing relevant documentation chunks. + +## Features + +- Pydantic AI documentation crawling and chunking +- Vector database storage with Supabase +- Semantic search using OpenAI embeddings +- RAG-based question answering +- Support for code block preservation +- Streamlit UI for interactive querying + +## Prerequisites + +- Python 3.11+ +- Supabase account and database +- OpenAI API key +- Streamlit (for web interface) + +## Installation + +1. Clone the repository: +```bash +git clone https://github.com/coleam00/archon.git +cd archon/iterations/v1-single-agent +``` + +2. Install dependencies (recommended to use a Python virtual environment): +```bash +python -m venv venv +source venv/bin/activate # On Windows: venv\Scripts\activate +pip install -r requirements.txt +``` + +3. Set up environment variables: + - Rename `.env.example` to `.env` + - Edit `.env` with your API keys and preferences: + ```env + OPENAI_API_KEY=your_openai_api_key + SUPABASE_URL=your_supabase_url + SUPABASE_SERVICE_KEY=your_supabase_service_key + LLM_MODEL=gpt-4o-mini # or your preferred OpenAI model + ``` + +## Usage + +### Database Setup + +Execute the SQL commands in `site_pages.sql` to: +1. Create the necessary tables +2. Enable vector similarity search +3. Set up Row Level Security policies + +In Supabase, do this by going to the "SQL Editor" tab and pasting in the SQL into the editor there. Then click "Run". + +### Crawl Documentation + +To crawl and store documentation in the vector database: + +```bash +python crawl_pydantic_ai_docs.py +``` + +This will: +1. Fetch URLs from the documentation sitemap +2. Crawl each page and split into chunks +3. Generate embeddings and store in Supabase + +### Streamlit Web Interface + +For an interactive web interface to query the documentation: + +```bash +streamlit run streamlit_ui.py +``` + +The interface will be available at `http://localhost:8501` + +## Configuration + +### Database Schema + +The Supabase database uses the following schema: +```sql +CREATE TABLE site_pages ( + id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), + url TEXT, + chunk_number INTEGER, + title TEXT, + summary TEXT, + content TEXT, + metadata JSONB, + embedding VECTOR(1536) +); +``` + +### Chunking Configuration + +You can configure chunking parameters in `crawl_pydantic_ai_docs.py`: +```python +chunk_size = 5000 # Characters per chunk +``` + +The chunker intelligently preserves: +- Code blocks +- Paragraph boundaries +- Sentence boundaries + +## Project Structure + +- `crawl_pydantic_ai_docs.py`: Documentation crawler and processor +- `pydantic_ai_expert.py`: RAG agent implementation +- `streamlit_ui.py`: Web interface +- `site_pages.sql`: Database setup commands +- `requirements.txt`: Project dependencies + +## Contributing + +Contributions are welcome! Please feel free to submit a Pull Request. \ No newline at end of file diff --git a/iterations/v1-single-agent/crawl_pydantic_ai_docs.py b/iterations/v1-single-agent/crawl_pydantic_ai_docs.py new file mode 100644 index 0000000..81e897f --- /dev/null +++ b/iterations/v1-single-agent/crawl_pydantic_ai_docs.py @@ -0,0 +1,245 @@ +import os +import sys +import json +import asyncio +import requests +from xml.etree import ElementTree +from typing import List, Dict, Any +from dataclasses import dataclass +from datetime import datetime, timezone +from urllib.parse import urlparse +from dotenv import load_dotenv + +from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode +from openai import AsyncOpenAI +from supabase import create_client, Client + +load_dotenv() + +# Initialize OpenAI and Supabase clients +openai_client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY")) +supabase: Client = create_client( + os.getenv("SUPABASE_URL"), + os.getenv("SUPABASE_SERVICE_KEY") +) + +@dataclass +class ProcessedChunk: + url: str + chunk_number: int + title: str + summary: str + content: str + metadata: Dict[str, Any] + embedding: List[float] + +def chunk_text(text: str, chunk_size: int = 5000) -> List[str]: + """Split text into chunks, respecting code blocks and paragraphs.""" + chunks = [] + start = 0 + text_length = len(text) + + while start < text_length: + # Calculate end position + end = start + chunk_size + + # If we're at the end of the text, just take what's left + if end >= text_length: + chunks.append(text[start:].strip()) + break + + # Try to find a code block boundary first (```) + chunk = text[start:end] + code_block = chunk.rfind('```') + if code_block != -1 and code_block > chunk_size * 0.3: + end = start + code_block + + # If no code block, try to break at a paragraph + elif '\n\n' in chunk: + # Find the last paragraph break + last_break = chunk.rfind('\n\n') + if last_break > chunk_size * 0.3: # Only break if we're past 30% of chunk_size + end = start + last_break + + # If no paragraph break, try to break at a sentence + elif '. ' in chunk: + # Find the last sentence break + last_period = chunk.rfind('. ') + if last_period > chunk_size * 0.3: # Only break if we're past 30% of chunk_size + end = start + last_period + 1 + + # Extract chunk and clean it up + chunk = text[start:end].strip() + if chunk: + chunks.append(chunk) + + # Move start position for next chunk + start = max(start + 1, end) + + return chunks + +async def get_title_and_summary(chunk: str, url: str) -> Dict[str, str]: + """Extract title and summary using GPT-4.""" + system_prompt = """You are an AI that extracts titles and summaries from documentation chunks. + Return a JSON object with 'title' and 'summary' keys. + For the title: If this seems like the start of a document, extract its title. If it's a middle chunk, derive a descriptive title. + For the summary: Create a concise summary of the main points in this chunk. + Keep both title and summary concise but informative.""" + + try: + response = await openai_client.chat.completions.create( + model=os.getenv("LLM_MODEL", "gpt-4o-mini"), + messages=[ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": f"URL: {url}\n\nContent:\n{chunk[:1000]}..."} # Send first 1000 chars for context + ], + response_format={ "type": "json_object" } + ) + return json.loads(response.choices[0].message.content) + except Exception as e: + print(f"Error getting title and summary: {e}") + return {"title": "Error processing title", "summary": "Error processing summary"} + +async def get_embedding(text: str) -> List[float]: + """Get embedding vector from OpenAI.""" + try: + response = await openai_client.embeddings.create( + model="text-embedding-3-small", + input=text + ) + return response.data[0].embedding + except Exception as e: + print(f"Error getting embedding: {e}") + return [0] * 1536 # Return zero vector on error + +async def process_chunk(chunk: str, chunk_number: int, url: str) -> ProcessedChunk: + """Process a single chunk of text.""" + # Get title and summary + extracted = await get_title_and_summary(chunk, url) + + # Get embedding + embedding = await get_embedding(chunk) + + # Create metadata + metadata = { + "source": "pydantic_ai_docs", + "chunk_size": len(chunk), + "crawled_at": datetime.now(timezone.utc).isoformat(), + "url_path": urlparse(url).path + } + + return ProcessedChunk( + url=url, + chunk_number=chunk_number, + title=extracted['title'], + summary=extracted['summary'], + content=chunk, # Store the original chunk content + metadata=metadata, + embedding=embedding + ) + +async def insert_chunk(chunk: ProcessedChunk): + """Insert a processed chunk into Supabase.""" + try: + data = { + "url": chunk.url, + "chunk_number": chunk.chunk_number, + "title": chunk.title, + "summary": chunk.summary, + "content": chunk.content, + "metadata": chunk.metadata, + "embedding": chunk.embedding + } + + result = supabase.table("site_pages").insert(data).execute() + print(f"Inserted chunk {chunk.chunk_number} for {chunk.url}") + return result + except Exception as e: + print(f"Error inserting chunk: {e}") + return None + +async def process_and_store_document(url: str, markdown: str): + """Process a document and store its chunks in parallel.""" + # Split into chunks + chunks = chunk_text(markdown) + + # Process chunks in parallel + tasks = [ + process_chunk(chunk, i, url) + for i, chunk in enumerate(chunks) + ] + processed_chunks = await asyncio.gather(*tasks) + + # Store chunks in parallel + insert_tasks = [ + insert_chunk(chunk) + for chunk in processed_chunks + ] + await asyncio.gather(*insert_tasks) + +async def crawl_parallel(urls: List[str], max_concurrent: int = 5): + """Crawl multiple URLs in parallel with a concurrency limit.""" + browser_config = BrowserConfig( + headless=True, + verbose=False, + extra_args=["--disable-gpu", "--disable-dev-shm-usage", "--no-sandbox"], + ) + crawl_config = CrawlerRunConfig(cache_mode=CacheMode.BYPASS) + + # Create the crawler instance + crawler = AsyncWebCrawler(config=browser_config) + await crawler.start() + + try: + # Create a semaphore to limit concurrency + semaphore = asyncio.Semaphore(max_concurrent) + + async def process_url(url: str): + async with semaphore: + result = await crawler.arun( + url=url, + config=crawl_config, + session_id="session1" + ) + if result.success: + print(f"Successfully crawled: {url}") + await process_and_store_document(url, result.markdown_v2.raw_markdown) + else: + print(f"Failed: {url} - Error: {result.error_message}") + + # Process all URLs in parallel with limited concurrency + await asyncio.gather(*[process_url(url) for url in urls]) + finally: + await crawler.close() + +def get_pydantic_ai_docs_urls() -> List[str]: + """Get URLs from Pydantic AI docs sitemap.""" + sitemap_url = "https://ai.pydantic.dev/sitemap.xml" + try: + response = requests.get(sitemap_url) + response.raise_for_status() + + # Parse the XML + root = ElementTree.fromstring(response.content) + + # Extract all URLs from the sitemap + namespace = {'ns': 'http://www.sitemaps.org/schemas/sitemap/0.9'} + urls = [loc.text for loc in root.findall('.//ns:loc', namespace)] + + return urls + except Exception as e: + print(f"Error fetching sitemap: {e}") + return [] + +async def main(): + # Get URLs from Pydantic AI docs + urls = get_pydantic_ai_docs_urls() + if not urls: + print("No URLs found to crawl") + return + + print(f"Found {len(urls)} URLs to crawl") + await crawl_parallel(urls) + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/iterations/v1-single-agent/pydantic_ai_coder.py b/iterations/v1-single-agent/pydantic_ai_coder.py new file mode 100644 index 0000000..6f72db4 --- /dev/null +++ b/iterations/v1-single-agent/pydantic_ai_coder.py @@ -0,0 +1,193 @@ +from __future__ import annotations as _annotations + +from dataclasses import dataclass +from dotenv import load_dotenv +import logfire +import asyncio +import httpx +import os + +from pydantic_ai import Agent, ModelRetry, RunContext +from pydantic_ai.models.openai import OpenAIModel +from openai import AsyncOpenAI +from supabase import Client +from typing import List + +load_dotenv() + +llm = os.getenv('LLM_MODEL', 'gpt-4o-mini') +model = OpenAIModel(llm) + +logfire.configure(send_to_logfire='if-token-present') + +@dataclass +class PydanticAIDeps: + supabase: Client + openai_client: AsyncOpenAI + +system_prompt = """ +~~ CONTEXT: ~~ + +You are an expert at Pydantic AI - a Python AI agent framework that you have access to all the documentation to, +including examples, an API reference, and other resources to help you build Pydantic AI agents. + +~~ GOAL: ~~ + +Your only job is to help the user create an AI agent with Pydantic AI. +The user will describe the AI agent they want to build, or if they don't, guide them towards doing so. +You will take their requirements, and then search through the Pydantic AI documentation with the tools provided +to find all the necessary information to create the AI agent with correct code. + +It's important for you to search through multiple Pydantic AI documentation pages to get all the information you need. +Almost never stick to just one page - use RAG and the other documentation tools multiple times when you are creating +an AI agent from scratch for the user. + +~~ STRUCTURE: ~~ + +When you build an AI agent from scratch, split the agent into this files and give the code for each: +- `agent.py`: The main agent file, which is where the Pydantic AI agent is defined. +- `agent_tools.py`: A tools file for the agent, which is where all the tool functions are defined. Use this for more complex agents. +- `agent_prompts.py`: A prompts file for the agent, which includes all system prompts and other prompts used by the agent. Use this when there are many prompts or large ones. +- `.env.example`: An example `.env` file - specify each variable that the user will need to fill in and a quick comment above each one for how to do so. +- `requirements.txt`: Don't include any versions, just the top level package names needed for the agent. + +~~ INSTRUCTIONS: ~~ + +- Don't ask the user before taking an action, just do it. Always make sure you look at the documentation with the provided tools before writing any code. +- When you first look at the documentation, always start with RAG. +Then also always check the list of available documentation pages and retrieve the content of page(s) if it'll help. +- Always let the user know when you didn't find the answer in the documentation or the right URL - be honest. +- Helpful tip: when starting a new AI agent build, it's a good idea to look at the 'weather agent' in the docs as an example. +- When starting a new AI agent build, always produce the full code for the AI agent - never tell the user to finish a tool/function. +- When refining an existing AI agent build in a conversation, just share the code changes necessary. +""" + +pydantic_ai_coder = Agent( + model, + system_prompt=system_prompt, + deps_type=PydanticAIDeps, + retries=2 +) + +async def get_embedding(text: str, openai_client: AsyncOpenAI) -> List[float]: + """Get embedding vector from OpenAI.""" + try: + response = await openai_client.embeddings.create( + model="text-embedding-3-small", + input=text + ) + return response.data[0].embedding + except Exception as e: + print(f"Error getting embedding: {e}") + return [0] * 1536 # Return zero vector on error + +@pydantic_ai_coder.tool +async def retrieve_relevant_documentation(ctx: RunContext[PydanticAIDeps], user_query: str) -> str: + """ + Retrieve relevant documentation chunks based on the query with RAG. + + Args: + ctx: The context including the Supabase client and OpenAI client + user_query: The user's question or query + + Returns: + A formatted string containing the top 5 most relevant documentation chunks + """ + try: + # Get the embedding for the query + query_embedding = await get_embedding(user_query, ctx.deps.openai_client) + + # Query Supabase for relevant documents + result = ctx.deps.supabase.rpc( + 'match_site_pages', + { + 'query_embedding': query_embedding, + 'match_count': 5, + 'filter': {'source': 'pydantic_ai_docs'} + } + ).execute() + + if not result.data: + return "No relevant documentation found." + + # Format the results + formatted_chunks = [] + for doc in result.data: + chunk_text = f""" +# {doc['title']} + +{doc['content']} +""" + formatted_chunks.append(chunk_text) + + # Join all chunks with a separator + return "\n\n---\n\n".join(formatted_chunks) + + except Exception as e: + print(f"Error retrieving documentation: {e}") + return f"Error retrieving documentation: {str(e)}" + +@pydantic_ai_coder.tool +async def list_documentation_pages(ctx: RunContext[PydanticAIDeps]) -> List[str]: + """ + Retrieve a list of all available Pydantic AI documentation pages. + + Returns: + List[str]: List of unique URLs for all documentation pages + """ + try: + # Query Supabase for unique URLs where source is pydantic_ai_docs + result = ctx.deps.supabase.from_('site_pages') \ + .select('url') \ + .eq('metadata->>source', 'pydantic_ai_docs') \ + .execute() + + if not result.data: + return [] + + # Extract unique URLs + urls = sorted(set(doc['url'] for doc in result.data)) + return urls + + except Exception as e: + print(f"Error retrieving documentation pages: {e}") + return [] + +@pydantic_ai_coder.tool +async def get_page_content(ctx: RunContext[PydanticAIDeps], url: str) -> str: + """ + Retrieve the full content of a specific documentation page by combining all its chunks. + + Args: + ctx: The context including the Supabase client + url: The URL of the page to retrieve + + Returns: + str: The complete page content with all chunks combined in order + """ + try: + # Query Supabase for all chunks of this URL, ordered by chunk_number + result = ctx.deps.supabase.from_('site_pages') \ + .select('title, content, chunk_number') \ + .eq('url', url) \ + .eq('metadata->>source', 'pydantic_ai_docs') \ + .order('chunk_number') \ + .execute() + + if not result.data: + return f"No content found for URL: {url}" + + # Format the page with its title and all chunks + page_title = result.data[0]['title'].split(' - ')[0] # Get the main title + formatted_content = [f"# {page_title}\n"] + + # Add each chunk's content + for chunk in result.data: + formatted_content.append(chunk['content']) + + # Join everything together + return "\n\n".join(formatted_content) + + except Exception as e: + print(f"Error retrieving page content: {e}") + return f"Error retrieving page content: {str(e)}" \ No newline at end of file diff --git a/iterations/v1-single-agent/requirements.txt b/iterations/v1-single-agent/requirements.txt new file mode 100644 index 0000000..b89d76a Binary files /dev/null and b/iterations/v1-single-agent/requirements.txt differ diff --git a/iterations/v1-single-agent/site_pages.sql b/iterations/v1-single-agent/site_pages.sql new file mode 100644 index 0000000..6354669 --- /dev/null +++ b/iterations/v1-single-agent/site_pages.sql @@ -0,0 +1,72 @@ +-- Enable the pgvector extension +create extension if not exists vector; + +-- Create the documentation chunks table +create table site_pages ( + id bigserial primary key, + url varchar not null, + chunk_number integer not null, + title varchar not null, + summary varchar not null, + content text not null, -- Added content column + metadata jsonb not null default '{}'::jsonb, -- Added metadata column + embedding vector(1536), -- OpenAI embeddings are 1536 dimensions + created_at timestamp with time zone default timezone('utc'::text, now()) not null, + + -- Add a unique constraint to prevent duplicate chunks for the same URL + unique(url, chunk_number) +); + +-- Create an index for better vector similarity search performance +create index on site_pages using ivfflat (embedding vector_cosine_ops); + +-- Create an index on metadata for faster filtering +create index idx_site_pages_metadata on site_pages using gin (metadata); + +-- Create a function to search for documentation chunks +create function match_site_pages ( + query_embedding vector(1536), + match_count int default 10, + filter jsonb DEFAULT '{}'::jsonb +) returns table ( + id bigint, + url varchar, + chunk_number integer, + title varchar, + summary varchar, + content text, + metadata jsonb, + similarity float +) +language plpgsql +as $$ +#variable_conflict use_column +begin + return query + select + id, + url, + chunk_number, + title, + summary, + content, + metadata, + 1 - (site_pages.embedding <=> query_embedding) as similarity + from site_pages + where metadata @> filter + order by site_pages.embedding <=> query_embedding + limit match_count; +end; +$$; + +-- Everything above will work for any PostgreSQL database. The below commands are for Supabase security + +-- Enable RLS on the table +alter table site_pages enable row level security; + +-- Create a policy that allows anyone to read +create policy "Allow public read access" + on site_pages + for select + to public + using (true); \ No newline at end of file diff --git a/iterations/v1-single-agent/streamlit_ui.py b/iterations/v1-single-agent/streamlit_ui.py new file mode 100644 index 0000000..e9fcce2 --- /dev/null +++ b/iterations/v1-single-agent/streamlit_ui.py @@ -0,0 +1,143 @@ +from __future__ import annotations +from typing import Literal, TypedDict +import asyncio +import os + +import streamlit as st +import json +import logfire +from supabase import Client +from openai import AsyncOpenAI + +# Import all the message part classes +from pydantic_ai.messages import ( + ModelMessage, + ModelRequest, + ModelResponse, + SystemPromptPart, + UserPromptPart, + TextPart, + ToolCallPart, + ToolReturnPart, + RetryPromptPart, + ModelMessagesTypeAdapter +) +from pydantic_ai_coder import pydantic_ai_coder, PydanticAIDeps + +# Load environment variables +from dotenv import load_dotenv +load_dotenv() + +openai_client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY")) +supabase: Client = Client( + os.getenv("SUPABASE_URL"), + os.getenv("SUPABASE_SERVICE_KEY") +) + +# Configure logfire to suppress warnings (optional) +logfire.configure(send_to_logfire='never') + +class ChatMessage(TypedDict): + """Format of messages sent to the browser/API.""" + + role: Literal['user', 'model'] + timestamp: str + content: str + + +def display_message_part(part): + """ + Display a single part of a message in the Streamlit UI. + Customize how you display system prompts, user prompts, + tool calls, tool returns, etc. + """ + # system-prompt + if part.part_kind == 'system-prompt': + with st.chat_message("system"): + st.markdown(f"**System**: {part.content}") + # user-prompt + elif part.part_kind == 'user-prompt': + with st.chat_message("user"): + st.markdown(part.content) + # text + elif part.part_kind == 'text': + with st.chat_message("assistant"): + st.markdown(part.content) + + +async def run_agent_with_streaming(user_input: str): + """ + Run the agent with streaming text for the user_input prompt, + while maintaining the entire conversation in `st.session_state.messages`. + """ + # Prepare dependencies + deps = PydanticAIDeps( + supabase=supabase, + openai_client=openai_client + ) + + # Run the agent in a stream + async with pydantic_ai_coder.run_stream( + user_input, + deps=deps, + message_history= st.session_state.messages[:-1], # pass entire conversation so far + ) as result: + # We'll gather partial text to show incrementally + partial_text = "" + message_placeholder = st.empty() + + # Render partial text as it arrives + async for chunk in result.stream_text(delta=True): + partial_text += chunk + message_placeholder.markdown(partial_text) + + # Now that the stream is finished, we have a final result. + # Add new messages from this run, excluding user-prompt messages + filtered_messages = [msg for msg in result.new_messages() + if not (hasattr(msg, 'parts') and + any(part.part_kind == 'user-prompt' for part in msg.parts))] + st.session_state.messages.extend(filtered_messages) + + # Add the final response to the messages + st.session_state.messages.append( + ModelResponse(parts=[TextPart(content=partial_text)]) + ) + + +async def main(): + st.title("Archon - Agent Builder") + st.write("Describe to me an AI agent you want to build and I'll code it for you with Pydantic AI.") + + # Initialize chat history in session state if not present + if "messages" not in st.session_state: + st.session_state.messages = [] + + # Display all messages from the conversation so far + # Each message is either a ModelRequest or ModelResponse. + # We iterate over their parts to decide how to display them. + for msg in st.session_state.messages: + if isinstance(msg, ModelRequest) or isinstance(msg, ModelResponse): + for part in msg.parts: + display_message_part(part) + + # Chat input for the user + user_input = st.chat_input("What do you want to build today?") + + if user_input: + # We append a new request to the conversation explicitly + st.session_state.messages.append( + ModelRequest(parts=[UserPromptPart(content=user_input)]) + ) + + # Display user prompt in the UI + with st.chat_message("user"): + st.markdown(user_input) + + # Display the assistant's partial response while streaming + with st.chat_message("assistant"): + # Actually run the agent now, streaming the text + await run_agent_with_streaming(user_input) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/iterations/v2-agentic-workflow/.env.example b/iterations/v2-agentic-workflow/.env.example new file mode 100644 index 0000000..56b6e10 --- /dev/null +++ b/iterations/v2-agentic-workflow/.env.example @@ -0,0 +1,33 @@ +# Base URL for the OpenAI instance (default is https://api.openai.com/v1) +# OpenAI: https://api.openai.com/v1 +# Ollama (example): http://localhost:11434/v1 +# OpenRouter: https://openrouter.ai/api/v1 +BASE_URL= + +# Get your Open AI API Key by following these instructions - +# https://help.openai.com/en/articles/4936850-where-do-i-find-my-openai-api-key +# Even if using OpenRouter/Ollama, you still need to set this for the embedding model. +# Future versions of Archon will be more flexible with this. +OPENAI_API_KEY= + +# For OpenAI: https://help.openai.com/en/articles/4936850-where-do-i-find-my-openai-api-key +# For OpenRouter: https://openrouter.ai/keys +LLM_API_KEY= + +# For the Supabase version (sample_supabase_agent.py), set your Supabase URL and Service Key. +# Get your SUPABASE_URL from the API section of your Supabase project settings - +# https://supabase.com/dashboard/project//settings/api +SUPABASE_URL= + +# Get your SUPABASE_SERVICE_KEY from the API section of your Supabase project settings - +# https://supabase.com/dashboard/project//settings/api +# On this page it is called the service_role secret. +SUPABASE_SERVICE_KEY= + +# The LLM you want to use for the reasoner (o3-mini, R1, QwQ, etc.). +# Example: o3-mini +REASONER_MODEL= + +# The LLM you want to use for the primary agent/coder. +# Example: gpt-4o-mini +PRIMARY_MODEL= \ No newline at end of file diff --git a/iterations/v2-agentic-workflow/README.md b/iterations/v2-agentic-workflow/README.md new file mode 100644 index 0000000..e21e5a8 --- /dev/null +++ b/iterations/v2-agentic-workflow/README.md @@ -0,0 +1,132 @@ +# Archon V2 - Agentic Workflow for Building Pydantic AI Agents + +This is the second iteration of the Archon project, building upon V1 by introducing LangGraph for a full agentic workflow. The system starts with a reasoning LLM (like O3-mini or R1) that analyzes user requirements and documentation to create a detailed scope, which then guides specialized coding and routing agents in generating high-quality Pydantic AI agents. + +An intelligent documentation crawler and RAG (Retrieval-Augmented Generation) system built using Pydantic AI, LangGraph, and Supabase that is capable of building other Pydantic AI agents. The system crawls the Pydantic AI documentation, stores content in a vector database, and provides Pydantic AI agent code by retrieving and analyzing relevant documentation chunks. + +This version also supports local LLMs with Ollama for the main agent and reasoning LLM. + +Note that we are still relying on OpenAI for embeddings no matter what, but future versions of Archon will change that. + +## Features + +- Multi-agent workflow using LangGraph +- Specialized agents for reasoning, routing, and coding +- Pydantic AI documentation crawling and chunking +- Vector database storage with Supabase +- Semantic search using OpenAI embeddings +- RAG-based question answering +- Support for code block preservation +- Streamlit UI for interactive querying + +## Prerequisites + +- Python 3.11+ +- Supabase account and database +- OpenAI/OpenRouter API key or Ollama for local LLMs +- Streamlit (for web interface) + +## Installation + +1. Clone the repository: +```bash +git clone https://github.com/coleam00/archon.git +cd archon/iterations/v2-agentic-workflow +``` + +2. Install dependencies (recommended to use a Python virtual environment): +```bash +python -m venv venv +source venv/bin/activate # On Windows: venv\Scripts\activate +pip install -r requirements.txt +``` + +3. Set up environment variables: + - Rename `.env.example` to `.env` + - Edit `.env` with your API keys and preferences: + ```env + BASE_URL=https://api.openai.com/v1 for OpenAI, https://api.openrouter.ai/v1 for OpenRouter, or your Ollama URL + LLM_API_KEY=your_openai_or_openrouter_api_key + OPENAI_API_KEY=your_openai_api_key + SUPABASE_URL=your_supabase_url + SUPABASE_SERVICE_KEY=your_supabase_service_key + PRIMARY_MODEL=gpt-4o-mini # or your preferred OpenAI model for main agent + REASONER_MODEL=o3-mini # or your preferred OpenAI model for reasoning + ``` + +## Usage + +### Database Setup + +Execute the SQL commands in `site_pages.sql` to: +1. Create the necessary tables +2. Enable vector similarity search +3. Set up Row Level Security policies + +In Supabase, do this by going to the "SQL Editor" tab and pasting in the SQL into the editor there. Then click "Run". + +### Crawl Documentation + +To crawl and store documentation in the vector database: + +```bash +python crawl_pydantic_ai_docs.py +``` + +This will: +1. Fetch URLs from the documentation sitemap +2. Crawl each page and split into chunks +3. Generate embeddings and store in Supabase + +### Chunking Configuration + +You can configure chunking parameters in `crawl_pydantic_ai_docs.py`: +```python +chunk_size = 5000 # Characters per chunk +``` + +The chunker intelligently preserves: +- Code blocks +- Paragraph boundaries +- Sentence boundaries + +### Streamlit Web Interface + +For an interactive web interface to query the documentation and create agents: + +```bash +streamlit run streamlit_ui.py +``` + +The interface will be available at `http://localhost:8501` + +## Configuration + +### Database Schema + +The Supabase database uses the following schema: +```sql +CREATE TABLE site_pages ( + id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), + url TEXT, + chunk_number INTEGER, + title TEXT, + summary TEXT, + content TEXT, + metadata JSONB, + embedding VECTOR(1536) +); +``` + +## Project Structure + +- `archon_graph.py`: LangGraph workflow definition and agent coordination +- `pydantic_ai_coder.py`: Main coding agent with RAG capabilities +- `crawl_pydantic_ai_docs.py`: Documentation crawler and processor +- `streamlit_ui.py`: Web interface with streaming support +- `site_pages.sql`: Database setup commands +- `requirements.txt`: Project dependencies + +## Contributing + +Contributions are welcome! Please feel free to submit a Pull Request. diff --git a/iterations/v2-agentic-workflow/archon_graph.py b/iterations/v2-agentic-workflow/archon_graph.py new file mode 100644 index 0000000..42dffc4 --- /dev/null +++ b/iterations/v2-agentic-workflow/archon_graph.py @@ -0,0 +1,201 @@ +from pydantic_ai.models.openai import OpenAIModel +from pydantic_ai import Agent, RunContext +from langgraph.graph import StateGraph, START, END +from langgraph.checkpoint.memory import MemorySaver +from typing import TypedDict, Annotated, List, Any +from langgraph.config import get_stream_writer +from langgraph.types import interrupt +from dotenv import load_dotenv +from openai import AsyncOpenAI +from supabase import Client +import logfire +import os + +# Import the message classes from Pydantic AI +from pydantic_ai.messages import ( + ModelMessage, + ModelMessagesTypeAdapter +) + +from pydantic_ai_coder import pydantic_ai_coder, PydanticAIDeps, list_documentation_pages_helper + +# Load environment variables +load_dotenv() + +# Configure logfire to suppress warnings (optional) +logfire.configure(send_to_logfire='never') + +base_url = os.getenv('BASE_URL', 'https://api.openai.com/v1') +api_key = os.getenv('LLM_API_KEY', 'no-llm-api-key-provided') +is_ollama = "localhost" in base_url.lower() +reasoner_llm_model = os.getenv('REASONER_MODEL', 'o3-mini') +reasoner = Agent( + OpenAIModel(reasoner_llm_model, base_url=base_url, api_key=api_key), + system_prompt='You are an expert at coding AI agents with Pydantic AI and defining the scope for doing so.', +) + +primary_llm_model = os.getenv('PRIMARY_MODEL', 'gpt-4o-mini') +router_agent = Agent( + OpenAIModel(primary_llm_model, base_url=base_url, api_key=api_key), + system_prompt='Your job is to route the user message either to the end of the conversation or to continue coding the AI agent.', +) + +end_conversation_agent = Agent( + OpenAIModel(primary_llm_model, base_url=base_url, api_key=api_key), + system_prompt='Your job is to end a conversation for creating an AI agent by giving instructions for how to execute the agent and they saying a nice goodbye to the user.', +) + +openai_client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY")) +supabase: Client = Client( + os.getenv("SUPABASE_URL"), + os.getenv("SUPABASE_SERVICE_KEY") +) + +# Define state schema +class AgentState(TypedDict): + latest_user_message: str + messages: Annotated[List[bytes], lambda x, y: x + y] + scope: str + +# Scope Definition Node with Reasoner LLM +async def define_scope_with_reasoner(state: AgentState): + # First, get the documentation pages so the reasoner can decide which ones are necessary + documentation_pages = await list_documentation_pages_helper(supabase) + documentation_pages_str = "\n".join(documentation_pages) + + # Then, use the reasoner to define the scope + prompt = f""" + User AI Agent Request: {state['latest_user_message']} + + Create detailed scope document for the AI agent including: + - Architecture diagram + - Core components + - External dependencies + - Testing strategy + + Also based on these documentation pages available: + + {documentation_pages_str} + + Include a list of documentation pages that are relevant to creating this agent for the user in the scope document. + """ + + result = await reasoner.run(prompt) + scope = result.data + + # Save the scope to a file + scope_path = os.path.join("workbench", "scope.md") + os.makedirs("workbench", exist_ok=True) + + with open(scope_path, "w", encoding="utf-8") as f: + f.write(scope) + + return {"scope": scope} + +# Coding Node with Feedback Handling +async def coder_agent(state: AgentState, writer): + # Prepare dependencies + deps = PydanticAIDeps( + supabase=supabase, + openai_client=openai_client, + reasoner_output=state['scope'] + ) + + # Get the message history into the format for Pydantic AI + message_history: list[ModelMessage] = [] + for message_row in state['messages']: + message_history.extend(ModelMessagesTypeAdapter.validate_json(message_row)) + + # Run the agent in a stream + if is_ollama: + writer = get_stream_writer() + result = await pydantic_ai_coder.run(state['latest_user_message'], deps=deps, message_history= message_history) + writer(result.data) + else: + async with pydantic_ai_coder.run_stream( + state['latest_user_message'], + deps=deps, + message_history= message_history + ) as result: + # Stream partial text as it arrives + async for chunk in result.stream_text(delta=True): + writer(chunk) + + # print(ModelMessagesTypeAdapter.validate_json(result.new_messages_json())) + + return {"messages": [result.new_messages_json()]} + +# Interrupt the graph to get the user's next message +def get_next_user_message(state: AgentState): + value = interrupt({}) + + # Set the user's latest message for the LLM to continue the conversation + return { + "latest_user_message": value + } + +# Determine if the user is finished creating their AI agent or not +async def route_user_message(state: AgentState): + prompt = f""" + The user has sent a message: + + {state['latest_user_message']} + + If the user wants to end the conversation, respond with just the text "finish_conversation". + If the user wants to continue coding the AI agent, respond with just the text "coder_agent". + """ + + result = await router_agent.run(prompt) + next_action = result.data + + if next_action == "finish_conversation": + return "finish_conversation" + else: + return "coder_agent" + +# End of conversation agent to give instructions for executing the agent +async def finish_conversation(state: AgentState, writer): + # Get the message history into the format for Pydantic AI + message_history: list[ModelMessage] = [] + for message_row in state['messages']: + message_history.extend(ModelMessagesTypeAdapter.validate_json(message_row)) + + # Run the agent in a stream + if is_ollama: + writer = get_stream_writer() + result = await end_conversation_agent.run(state['latest_user_message'], message_history= message_history) + writer(result.data) + else: + async with end_conversation_agent.run_stream( + state['latest_user_message'], + message_history= message_history + ) as result: + # Stream partial text as it arrives + async for chunk in result.stream_text(delta=True): + writer(chunk) + + return {"messages": [result.new_messages_json()]} + +# Build workflow +builder = StateGraph(AgentState) + +# Add nodes +builder.add_node("define_scope_with_reasoner", define_scope_with_reasoner) +builder.add_node("coder_agent", coder_agent) +builder.add_node("get_next_user_message", get_next_user_message) +builder.add_node("finish_conversation", finish_conversation) + +# Set edges +builder.add_edge(START, "define_scope_with_reasoner") +builder.add_edge("define_scope_with_reasoner", "coder_agent") +builder.add_edge("coder_agent", "get_next_user_message") +builder.add_conditional_edges( + "get_next_user_message", + route_user_message, + {"coder_agent": "coder_agent", "finish_conversation": "finish_conversation"} +) +builder.add_edge("finish_conversation", END) + +# Configure persistence +memory = MemorySaver() +agentic_flow = builder.compile(checkpointer=memory) \ No newline at end of file diff --git a/iterations/v2-agentic-workflow/crawl_pydantic_ai_docs.py b/iterations/v2-agentic-workflow/crawl_pydantic_ai_docs.py new file mode 100644 index 0000000..81e897f --- /dev/null +++ b/iterations/v2-agentic-workflow/crawl_pydantic_ai_docs.py @@ -0,0 +1,245 @@ +import os +import sys +import json +import asyncio +import requests +from xml.etree import ElementTree +from typing import List, Dict, Any +from dataclasses import dataclass +from datetime import datetime, timezone +from urllib.parse import urlparse +from dotenv import load_dotenv + +from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode +from openai import AsyncOpenAI +from supabase import create_client, Client + +load_dotenv() + +# Initialize OpenAI and Supabase clients +openai_client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY")) +supabase: Client = create_client( + os.getenv("SUPABASE_URL"), + os.getenv("SUPABASE_SERVICE_KEY") +) + +@dataclass +class ProcessedChunk: + url: str + chunk_number: int + title: str + summary: str + content: str + metadata: Dict[str, Any] + embedding: List[float] + +def chunk_text(text: str, chunk_size: int = 5000) -> List[str]: + """Split text into chunks, respecting code blocks and paragraphs.""" + chunks = [] + start = 0 + text_length = len(text) + + while start < text_length: + # Calculate end position + end = start + chunk_size + + # If we're at the end of the text, just take what's left + if end >= text_length: + chunks.append(text[start:].strip()) + break + + # Try to find a code block boundary first (```) + chunk = text[start:end] + code_block = chunk.rfind('```') + if code_block != -1 and code_block > chunk_size * 0.3: + end = start + code_block + + # If no code block, try to break at a paragraph + elif '\n\n' in chunk: + # Find the last paragraph break + last_break = chunk.rfind('\n\n') + if last_break > chunk_size * 0.3: # Only break if we're past 30% of chunk_size + end = start + last_break + + # If no paragraph break, try to break at a sentence + elif '. ' in chunk: + # Find the last sentence break + last_period = chunk.rfind('. ') + if last_period > chunk_size * 0.3: # Only break if we're past 30% of chunk_size + end = start + last_period + 1 + + # Extract chunk and clean it up + chunk = text[start:end].strip() + if chunk: + chunks.append(chunk) + + # Move start position for next chunk + start = max(start + 1, end) + + return chunks + +async def get_title_and_summary(chunk: str, url: str) -> Dict[str, str]: + """Extract title and summary using GPT-4.""" + system_prompt = """You are an AI that extracts titles and summaries from documentation chunks. + Return a JSON object with 'title' and 'summary' keys. + For the title: If this seems like the start of a document, extract its title. If it's a middle chunk, derive a descriptive title. + For the summary: Create a concise summary of the main points in this chunk. + Keep both title and summary concise but informative.""" + + try: + response = await openai_client.chat.completions.create( + model=os.getenv("LLM_MODEL", "gpt-4o-mini"), + messages=[ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": f"URL: {url}\n\nContent:\n{chunk[:1000]}..."} # Send first 1000 chars for context + ], + response_format={ "type": "json_object" } + ) + return json.loads(response.choices[0].message.content) + except Exception as e: + print(f"Error getting title and summary: {e}") + return {"title": "Error processing title", "summary": "Error processing summary"} + +async def get_embedding(text: str) -> List[float]: + """Get embedding vector from OpenAI.""" + try: + response = await openai_client.embeddings.create( + model="text-embedding-3-small", + input=text + ) + return response.data[0].embedding + except Exception as e: + print(f"Error getting embedding: {e}") + return [0] * 1536 # Return zero vector on error + +async def process_chunk(chunk: str, chunk_number: int, url: str) -> ProcessedChunk: + """Process a single chunk of text.""" + # Get title and summary + extracted = await get_title_and_summary(chunk, url) + + # Get embedding + embedding = await get_embedding(chunk) + + # Create metadata + metadata = { + "source": "pydantic_ai_docs", + "chunk_size": len(chunk), + "crawled_at": datetime.now(timezone.utc).isoformat(), + "url_path": urlparse(url).path + } + + return ProcessedChunk( + url=url, + chunk_number=chunk_number, + title=extracted['title'], + summary=extracted['summary'], + content=chunk, # Store the original chunk content + metadata=metadata, + embedding=embedding + ) + +async def insert_chunk(chunk: ProcessedChunk): + """Insert a processed chunk into Supabase.""" + try: + data = { + "url": chunk.url, + "chunk_number": chunk.chunk_number, + "title": chunk.title, + "summary": chunk.summary, + "content": chunk.content, + "metadata": chunk.metadata, + "embedding": chunk.embedding + } + + result = supabase.table("site_pages").insert(data).execute() + print(f"Inserted chunk {chunk.chunk_number} for {chunk.url}") + return result + except Exception as e: + print(f"Error inserting chunk: {e}") + return None + +async def process_and_store_document(url: str, markdown: str): + """Process a document and store its chunks in parallel.""" + # Split into chunks + chunks = chunk_text(markdown) + + # Process chunks in parallel + tasks = [ + process_chunk(chunk, i, url) + for i, chunk in enumerate(chunks) + ] + processed_chunks = await asyncio.gather(*tasks) + + # Store chunks in parallel + insert_tasks = [ + insert_chunk(chunk) + for chunk in processed_chunks + ] + await asyncio.gather(*insert_tasks) + +async def crawl_parallel(urls: List[str], max_concurrent: int = 5): + """Crawl multiple URLs in parallel with a concurrency limit.""" + browser_config = BrowserConfig( + headless=True, + verbose=False, + extra_args=["--disable-gpu", "--disable-dev-shm-usage", "--no-sandbox"], + ) + crawl_config = CrawlerRunConfig(cache_mode=CacheMode.BYPASS) + + # Create the crawler instance + crawler = AsyncWebCrawler(config=browser_config) + await crawler.start() + + try: + # Create a semaphore to limit concurrency + semaphore = asyncio.Semaphore(max_concurrent) + + async def process_url(url: str): + async with semaphore: + result = await crawler.arun( + url=url, + config=crawl_config, + session_id="session1" + ) + if result.success: + print(f"Successfully crawled: {url}") + await process_and_store_document(url, result.markdown_v2.raw_markdown) + else: + print(f"Failed: {url} - Error: {result.error_message}") + + # Process all URLs in parallel with limited concurrency + await asyncio.gather(*[process_url(url) for url in urls]) + finally: + await crawler.close() + +def get_pydantic_ai_docs_urls() -> List[str]: + """Get URLs from Pydantic AI docs sitemap.""" + sitemap_url = "https://ai.pydantic.dev/sitemap.xml" + try: + response = requests.get(sitemap_url) + response.raise_for_status() + + # Parse the XML + root = ElementTree.fromstring(response.content) + + # Extract all URLs from the sitemap + namespace = {'ns': 'http://www.sitemaps.org/schemas/sitemap/0.9'} + urls = [loc.text for loc in root.findall('.//ns:loc', namespace)] + + return urls + except Exception as e: + print(f"Error fetching sitemap: {e}") + return [] + +async def main(): + # Get URLs from Pydantic AI docs + urls = get_pydantic_ai_docs_urls() + if not urls: + print("No URLs found to crawl") + return + + print(f"Found {len(urls)} URLs to crawl") + await crawl_parallel(urls) + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/iterations/v2-agentic-workflow/langgraph.json b/iterations/v2-agentic-workflow/langgraph.json new file mode 100644 index 0000000..7dfa2d5 --- /dev/null +++ b/iterations/v2-agentic-workflow/langgraph.json @@ -0,0 +1,7 @@ +{ + "dependencies": ["."], + "graphs": { + "agent": "./archon_graph.py:agentic_flow" + }, + "env": ".env" +} diff --git a/iterations/v2-agentic-workflow/pydantic_ai_coder.py b/iterations/v2-agentic-workflow/pydantic_ai_coder.py new file mode 100644 index 0000000..af39a24 --- /dev/null +++ b/iterations/v2-agentic-workflow/pydantic_ai_coder.py @@ -0,0 +1,219 @@ +from __future__ import annotations as _annotations + +from dataclasses import dataclass +from dotenv import load_dotenv +import logfire +import asyncio +import httpx +import os + +from pydantic_ai import Agent, ModelRetry, RunContext +from pydantic_ai.models.openai import OpenAIModel +from openai import AsyncOpenAI +from supabase import Client +from typing import List + +load_dotenv() + +llm = os.getenv('PRIMARY_MODEL', 'gpt-4o-mini') +base_url = os.getenv('BASE_URL', 'https://api.openai.com/v1') +api_key = os.getenv('LLM_API_KEY', 'no-llm-api-key-provided') +model = OpenAIModel(llm, base_url=base_url, api_key=api_key) + +logfire.configure(send_to_logfire='if-token-present') + +@dataclass +class PydanticAIDeps: + supabase: Client + openai_client: AsyncOpenAI + reasoner_output: str + +system_prompt = """ +~~ CONTEXT: ~~ + +You are an expert at Pydantic AI - a Python AI agent framework that you have access to all the documentation to, +including examples, an API reference, and other resources to help you build Pydantic AI agents. + +~~ GOAL: ~~ + +Your only job is to help the user create an AI agent with Pydantic AI. +The user will describe the AI agent they want to build, or if they don't, guide them towards doing so. +You will take their requirements, and then search through the Pydantic AI documentation with the tools provided +to find all the necessary information to create the AI agent with correct code. + +It's important for you to search through multiple Pydantic AI documentation pages to get all the information you need. +Almost never stick to just one page - use RAG and the other documentation tools multiple times when you are creating +an AI agent from scratch for the user. + +~~ STRUCTURE: ~~ + +When you build an AI agent from scratch, split the agent into this files and give the code for each: +- `agent.py`: The main agent file, which is where the Pydantic AI agent is defined. +- `agent_tools.py`: A tools file for the agent, which is where all the tool functions are defined. Use this for more complex agents. +- `agent_prompts.py`: A prompts file for the agent, which includes all system prompts and other prompts used by the agent. Use this when there are many prompts or large ones. +- `.env.example`: An example `.env` file - specify each variable that the user will need to fill in and a quick comment above each one for how to do so. +- `requirements.txt`: Don't include any versions, just the top level package names needed for the agent. + +~~ INSTRUCTIONS: ~~ + +- Don't ask the user before taking an action, just do it. Always make sure you look at the documentation with the provided tools before writing any code. +- When you first look at the documentation, always start with RAG. +Then also always check the list of available documentation pages and retrieve the content of page(s) if it'll help. +- Always let the user know when you didn't find the answer in the documentation or the right URL - be honest. +- Helpful tip: when starting a new AI agent build, it's a good idea to look at the 'weather agent' in the docs as an example. +- When starting a new AI agent build, always produce the full code for the AI agent - never tell the user to finish a tool/function. +- When refining an existing AI agent build in a conversation, just share the code changes necessary. +- Each time you respond to the user, ask them to let you know either if they need changes or the code looks good. +""" + +pydantic_ai_coder = Agent( + model, + system_prompt=system_prompt, + deps_type=PydanticAIDeps, + retries=2 +) + +@pydantic_ai_coder.system_prompt +def add_reasoner_output(ctx: RunContext[str]) -> str: + return f""" + \n\nAdditional thoughts/instructions from the reasoner LLM. + This scope includes documentation pages for you to search as well: + {ctx.deps.reasoner_output} + """ + + # Add this in to get some crazy tool calling: + # You must get ALL documentation pages listed in the scope. + +async def get_embedding(text: str, openai_client: AsyncOpenAI) -> List[float]: + """Get embedding vector from OpenAI.""" + try: + response = await openai_client.embeddings.create( + model="text-embedding-3-small", + input=text + ) + return response.data[0].embedding + except Exception as e: + print(f"Error getting embedding: {e}") + return [0] * 1536 # Return zero vector on error + +@pydantic_ai_coder.tool +async def retrieve_relevant_documentation(ctx: RunContext[PydanticAIDeps], user_query: str) -> str: + """ + Retrieve relevant documentation chunks based on the query with RAG. + + Args: + ctx: The context including the Supabase client and OpenAI client + user_query: The user's question or query + + Returns: + A formatted string containing the top 5 most relevant documentation chunks + """ + try: + # Get the embedding for the query + query_embedding = await get_embedding(user_query, ctx.deps.openai_client) + + # Query Supabase for relevant documents + result = ctx.deps.supabase.rpc( + 'match_site_pages', + { + 'query_embedding': query_embedding, + 'match_count': 5, + 'filter': {'source': 'pydantic_ai_docs'} + } + ).execute() + + if not result.data: + return "No relevant documentation found." + + # Format the results + formatted_chunks = [] + for doc in result.data: + chunk_text = f""" +# {doc['title']} + +{doc['content']} +""" + formatted_chunks.append(chunk_text) + + # Join all chunks with a separator + return "\n\n---\n\n".join(formatted_chunks) + + except Exception as e: + print(f"Error retrieving documentation: {e}") + return f"Error retrieving documentation: {str(e)}" + +async def list_documentation_pages_helper(supabase: Client) -> List[str]: + """ + Function to retrieve a list of all available Pydantic AI documentation pages. + This is called by the list_documentation_pages tool and also externally + to fetch documentation pages for the reasoner LLM. + + Returns: + List[str]: List of unique URLs for all documentation pages + """ + try: + # Query Supabase for unique URLs where source is pydantic_ai_docs + result = supabase.from_('site_pages') \ + .select('url') \ + .eq('metadata->>source', 'pydantic_ai_docs') \ + .execute() + + if not result.data: + return [] + + # Extract unique URLs + urls = sorted(set(doc['url'] for doc in result.data)) + return urls + + except Exception as e: + print(f"Error retrieving documentation pages: {e}") + return [] + +@pydantic_ai_coder.tool +async def list_documentation_pages(ctx: RunContext[PydanticAIDeps]) -> List[str]: + """ + Retrieve a list of all available Pydantic AI documentation pages. + + Returns: + List[str]: List of unique URLs for all documentation pages + """ + return await list_documentation_pages_helper(ctx.deps.supabase) + +@pydantic_ai_coder.tool +async def get_page_content(ctx: RunContext[PydanticAIDeps], url: str) -> str: + """ + Retrieve the full content of a specific documentation page by combining all its chunks. + + Args: + ctx: The context including the Supabase client + url: The URL of the page to retrieve + + Returns: + str: The complete page content with all chunks combined in order + """ + try: + # Query Supabase for all chunks of this URL, ordered by chunk_number + result = ctx.deps.supabase.from_('site_pages') \ + .select('title, content, chunk_number') \ + .eq('url', url) \ + .eq('metadata->>source', 'pydantic_ai_docs') \ + .order('chunk_number') \ + .execute() + + if not result.data: + return f"No content found for URL: {url}" + + # Format the page with its title and all chunks + page_title = result.data[0]['title'].split(' - ')[0] # Get the main title + formatted_content = [f"# {page_title}\n"] + + # Add each chunk's content + for chunk in result.data: + formatted_content.append(chunk['content']) + + # Join everything together + return "\n\n".join(formatted_content) + + except Exception as e: + print(f"Error retrieving page content: {e}") + return f"Error retrieving page content: {str(e)}" \ No newline at end of file diff --git a/iterations/v2-agentic-workflow/requirements.txt b/iterations/v2-agentic-workflow/requirements.txt new file mode 100644 index 0000000..0fdac91 Binary files /dev/null and b/iterations/v2-agentic-workflow/requirements.txt differ diff --git a/iterations/v2-agentic-workflow/site_pages.sql b/iterations/v2-agentic-workflow/site_pages.sql new file mode 100644 index 0000000..6354669 --- /dev/null +++ b/iterations/v2-agentic-workflow/site_pages.sql @@ -0,0 +1,72 @@ +-- Enable the pgvector extension +create extension if not exists vector; + +-- Create the documentation chunks table +create table site_pages ( + id bigserial primary key, + url varchar not null, + chunk_number integer not null, + title varchar not null, + summary varchar not null, + content text not null, -- Added content column + metadata jsonb not null default '{}'::jsonb, -- Added metadata column + embedding vector(1536), -- OpenAI embeddings are 1536 dimensions + created_at timestamp with time zone default timezone('utc'::text, now()) not null, + + -- Add a unique constraint to prevent duplicate chunks for the same URL + unique(url, chunk_number) +); + +-- Create an index for better vector similarity search performance +create index on site_pages using ivfflat (embedding vector_cosine_ops); + +-- Create an index on metadata for faster filtering +create index idx_site_pages_metadata on site_pages using gin (metadata); + +-- Create a function to search for documentation chunks +create function match_site_pages ( + query_embedding vector(1536), + match_count int default 10, + filter jsonb DEFAULT '{}'::jsonb +) returns table ( + id bigint, + url varchar, + chunk_number integer, + title varchar, + summary varchar, + content text, + metadata jsonb, + similarity float +) +language plpgsql +as $$ +#variable_conflict use_column +begin + return query + select + id, + url, + chunk_number, + title, + summary, + content, + metadata, + 1 - (site_pages.embedding <=> query_embedding) as similarity + from site_pages + where metadata @> filter + order by site_pages.embedding <=> query_embedding + limit match_count; +end; +$$; + +-- Everything above will work for any PostgreSQL database. The below commands are for Supabase security + +-- Enable RLS on the table +alter table site_pages enable row level security; + +-- Create a policy that allows anyone to read +create policy "Allow public read access" + on site_pages + for select + to public + using (true); \ No newline at end of file diff --git a/iterations/v2-agentic-workflow/streamlit_ui.py b/iterations/v2-agentic-workflow/streamlit_ui.py new file mode 100644 index 0000000..436282a --- /dev/null +++ b/iterations/v2-agentic-workflow/streamlit_ui.py @@ -0,0 +1,114 @@ +from __future__ import annotations +from typing import Literal, TypedDict +from langgraph.types import Command +from openai import AsyncOpenAI +from supabase import Client +import streamlit as st +import logfire +import asyncio +import json +import uuid +import os + +# Import all the message part classes +from pydantic_ai.messages import ( + ModelMessage, + ModelRequest, + ModelResponse, + SystemPromptPart, + UserPromptPart, + TextPart, + ToolCallPart, + ToolReturnPart, + RetryPromptPart, + ModelMessagesTypeAdapter +) + +from archon_graph import agentic_flow + +# Load environment variables +from dotenv import load_dotenv +load_dotenv() + +openai_client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY")) +supabase: Client = Client( + os.getenv("SUPABASE_URL"), + os.getenv("SUPABASE_SERVICE_KEY") +) + +# Configure logfire to suppress warnings (optional) +logfire.configure(send_to_logfire='never') + +@st.cache_resource +def get_thread_id(): + return str(uuid.uuid4()) + +thread_id = get_thread_id() + +async def run_agent_with_streaming(user_input: str): + """ + Run the agent with streaming text for the user_input prompt, + while maintaining the entire conversation in `st.session_state.messages`. + """ + config = { + "configurable": { + "thread_id": thread_id + } + } + + # First message from user + if len(st.session_state.messages) == 1: + async for msg in agentic_flow.astream( + {"latest_user_message": user_input}, config, stream_mode="custom" + ): + yield msg + # Continue the conversation + else: + async for msg in agentic_flow.astream( + Command(resume=user_input), config, stream_mode="custom" + ): + yield msg + + +async def main(): + st.title("Archon - Agent Builder") + st.write("Describe to me an AI agent you want to build and I'll code it for you with Pydantic AI.") + st.write("Example: Build me an AI agent that can search the web with the Brave API.") + + # Initialize chat history in session state if not present + if "messages" not in st.session_state: + st.session_state.messages = [] + + # Display chat messages from history on app rerun + for message in st.session_state.messages: + message_type = message["type"] + if message_type in ["human", "ai", "system"]: + with st.chat_message(message_type): + st.markdown(message["content"]) + + # Chat input for the user + user_input = st.chat_input("What do you want to build today?") + + if user_input: + # We append a new request to the conversation explicitly + st.session_state.messages.append({"type": "human", "content": user_input}) + + # Display user prompt in the UI + with st.chat_message("user"): + st.markdown(user_input) + + # Display assistant response in chat message container + response_content = "" + with st.chat_message("assistant"): + message_placeholder = st.empty() # Placeholder for updating the message + # Run the async generator to fetch responses + async for chunk in run_agent_with_streaming(user_input): + response_content += chunk + # Update the placeholder with the current response content + message_placeholder.markdown(response_content) + + st.session_state.messages.append({"type": "ai", "content": response_content}) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/langgraph.json b/langgraph.json new file mode 100644 index 0000000..7dfa2d5 --- /dev/null +++ b/langgraph.json @@ -0,0 +1,7 @@ +{ + "dependencies": ["."], + "graphs": { + "agent": "./archon_graph.py:agentic_flow" + }, + "env": ".env" +} diff --git a/public/Archon.png b/public/Archon.png new file mode 100644 index 0000000..6bc5e48 Binary files /dev/null and b/public/Archon.png differ diff --git a/pydantic_ai_coder.py b/pydantic_ai_coder.py new file mode 100644 index 0000000..af39a24 --- /dev/null +++ b/pydantic_ai_coder.py @@ -0,0 +1,219 @@ +from __future__ import annotations as _annotations + +from dataclasses import dataclass +from dotenv import load_dotenv +import logfire +import asyncio +import httpx +import os + +from pydantic_ai import Agent, ModelRetry, RunContext +from pydantic_ai.models.openai import OpenAIModel +from openai import AsyncOpenAI +from supabase import Client +from typing import List + +load_dotenv() + +llm = os.getenv('PRIMARY_MODEL', 'gpt-4o-mini') +base_url = os.getenv('BASE_URL', 'https://api.openai.com/v1') +api_key = os.getenv('LLM_API_KEY', 'no-llm-api-key-provided') +model = OpenAIModel(llm, base_url=base_url, api_key=api_key) + +logfire.configure(send_to_logfire='if-token-present') + +@dataclass +class PydanticAIDeps: + supabase: Client + openai_client: AsyncOpenAI + reasoner_output: str + +system_prompt = """ +~~ CONTEXT: ~~ + +You are an expert at Pydantic AI - a Python AI agent framework that you have access to all the documentation to, +including examples, an API reference, and other resources to help you build Pydantic AI agents. + +~~ GOAL: ~~ + +Your only job is to help the user create an AI agent with Pydantic AI. +The user will describe the AI agent they want to build, or if they don't, guide them towards doing so. +You will take their requirements, and then search through the Pydantic AI documentation with the tools provided +to find all the necessary information to create the AI agent with correct code. + +It's important for you to search through multiple Pydantic AI documentation pages to get all the information you need. +Almost never stick to just one page - use RAG and the other documentation tools multiple times when you are creating +an AI agent from scratch for the user. + +~~ STRUCTURE: ~~ + +When you build an AI agent from scratch, split the agent into this files and give the code for each: +- `agent.py`: The main agent file, which is where the Pydantic AI agent is defined. +- `agent_tools.py`: A tools file for the agent, which is where all the tool functions are defined. Use this for more complex agents. +- `agent_prompts.py`: A prompts file for the agent, which includes all system prompts and other prompts used by the agent. Use this when there are many prompts or large ones. +- `.env.example`: An example `.env` file - specify each variable that the user will need to fill in and a quick comment above each one for how to do so. +- `requirements.txt`: Don't include any versions, just the top level package names needed for the agent. + +~~ INSTRUCTIONS: ~~ + +- Don't ask the user before taking an action, just do it. Always make sure you look at the documentation with the provided tools before writing any code. +- When you first look at the documentation, always start with RAG. +Then also always check the list of available documentation pages and retrieve the content of page(s) if it'll help. +- Always let the user know when you didn't find the answer in the documentation or the right URL - be honest. +- Helpful tip: when starting a new AI agent build, it's a good idea to look at the 'weather agent' in the docs as an example. +- When starting a new AI agent build, always produce the full code for the AI agent - never tell the user to finish a tool/function. +- When refining an existing AI agent build in a conversation, just share the code changes necessary. +- Each time you respond to the user, ask them to let you know either if they need changes or the code looks good. +""" + +pydantic_ai_coder = Agent( + model, + system_prompt=system_prompt, + deps_type=PydanticAIDeps, + retries=2 +) + +@pydantic_ai_coder.system_prompt +def add_reasoner_output(ctx: RunContext[str]) -> str: + return f""" + \n\nAdditional thoughts/instructions from the reasoner LLM. + This scope includes documentation pages for you to search as well: + {ctx.deps.reasoner_output} + """ + + # Add this in to get some crazy tool calling: + # You must get ALL documentation pages listed in the scope. + +async def get_embedding(text: str, openai_client: AsyncOpenAI) -> List[float]: + """Get embedding vector from OpenAI.""" + try: + response = await openai_client.embeddings.create( + model="text-embedding-3-small", + input=text + ) + return response.data[0].embedding + except Exception as e: + print(f"Error getting embedding: {e}") + return [0] * 1536 # Return zero vector on error + +@pydantic_ai_coder.tool +async def retrieve_relevant_documentation(ctx: RunContext[PydanticAIDeps], user_query: str) -> str: + """ + Retrieve relevant documentation chunks based on the query with RAG. + + Args: + ctx: The context including the Supabase client and OpenAI client + user_query: The user's question or query + + Returns: + A formatted string containing the top 5 most relevant documentation chunks + """ + try: + # Get the embedding for the query + query_embedding = await get_embedding(user_query, ctx.deps.openai_client) + + # Query Supabase for relevant documents + result = ctx.deps.supabase.rpc( + 'match_site_pages', + { + 'query_embedding': query_embedding, + 'match_count': 5, + 'filter': {'source': 'pydantic_ai_docs'} + } + ).execute() + + if not result.data: + return "No relevant documentation found." + + # Format the results + formatted_chunks = [] + for doc in result.data: + chunk_text = f""" +# {doc['title']} + +{doc['content']} +""" + formatted_chunks.append(chunk_text) + + # Join all chunks with a separator + return "\n\n---\n\n".join(formatted_chunks) + + except Exception as e: + print(f"Error retrieving documentation: {e}") + return f"Error retrieving documentation: {str(e)}" + +async def list_documentation_pages_helper(supabase: Client) -> List[str]: + """ + Function to retrieve a list of all available Pydantic AI documentation pages. + This is called by the list_documentation_pages tool and also externally + to fetch documentation pages for the reasoner LLM. + + Returns: + List[str]: List of unique URLs for all documentation pages + """ + try: + # Query Supabase for unique URLs where source is pydantic_ai_docs + result = supabase.from_('site_pages') \ + .select('url') \ + .eq('metadata->>source', 'pydantic_ai_docs') \ + .execute() + + if not result.data: + return [] + + # Extract unique URLs + urls = sorted(set(doc['url'] for doc in result.data)) + return urls + + except Exception as e: + print(f"Error retrieving documentation pages: {e}") + return [] + +@pydantic_ai_coder.tool +async def list_documentation_pages(ctx: RunContext[PydanticAIDeps]) -> List[str]: + """ + Retrieve a list of all available Pydantic AI documentation pages. + + Returns: + List[str]: List of unique URLs for all documentation pages + """ + return await list_documentation_pages_helper(ctx.deps.supabase) + +@pydantic_ai_coder.tool +async def get_page_content(ctx: RunContext[PydanticAIDeps], url: str) -> str: + """ + Retrieve the full content of a specific documentation page by combining all its chunks. + + Args: + ctx: The context including the Supabase client + url: The URL of the page to retrieve + + Returns: + str: The complete page content with all chunks combined in order + """ + try: + # Query Supabase for all chunks of this URL, ordered by chunk_number + result = ctx.deps.supabase.from_('site_pages') \ + .select('title, content, chunk_number') \ + .eq('url', url) \ + .eq('metadata->>source', 'pydantic_ai_docs') \ + .order('chunk_number') \ + .execute() + + if not result.data: + return f"No content found for URL: {url}" + + # Format the page with its title and all chunks + page_title = result.data[0]['title'].split(' - ')[0] # Get the main title + formatted_content = [f"# {page_title}\n"] + + # Add each chunk's content + for chunk in result.data: + formatted_content.append(chunk['content']) + + # Join everything together + return "\n\n".join(formatted_content) + + except Exception as e: + print(f"Error retrieving page content: {e}") + return f"Error retrieving page content: {str(e)}" \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..0fdac91 Binary files /dev/null and b/requirements.txt differ diff --git a/site_pages.sql b/site_pages.sql new file mode 100644 index 0000000..6354669 --- /dev/null +++ b/site_pages.sql @@ -0,0 +1,72 @@ +-- Enable the pgvector extension +create extension if not exists vector; + +-- Create the documentation chunks table +create table site_pages ( + id bigserial primary key, + url varchar not null, + chunk_number integer not null, + title varchar not null, + summary varchar not null, + content text not null, -- Added content column + metadata jsonb not null default '{}'::jsonb, -- Added metadata column + embedding vector(1536), -- OpenAI embeddings are 1536 dimensions + created_at timestamp with time zone default timezone('utc'::text, now()) not null, + + -- Add a unique constraint to prevent duplicate chunks for the same URL + unique(url, chunk_number) +); + +-- Create an index for better vector similarity search performance +create index on site_pages using ivfflat (embedding vector_cosine_ops); + +-- Create an index on metadata for faster filtering +create index idx_site_pages_metadata on site_pages using gin (metadata); + +-- Create a function to search for documentation chunks +create function match_site_pages ( + query_embedding vector(1536), + match_count int default 10, + filter jsonb DEFAULT '{}'::jsonb +) returns table ( + id bigint, + url varchar, + chunk_number integer, + title varchar, + summary varchar, + content text, + metadata jsonb, + similarity float +) +language plpgsql +as $$ +#variable_conflict use_column +begin + return query + select + id, + url, + chunk_number, + title, + summary, + content, + metadata, + 1 - (site_pages.embedding <=> query_embedding) as similarity + from site_pages + where metadata @> filter + order by site_pages.embedding <=> query_embedding + limit match_count; +end; +$$; + +-- Everything above will work for any PostgreSQL database. The below commands are for Supabase security + +-- Enable RLS on the table +alter table site_pages enable row level security; + +-- Create a policy that allows anyone to read +create policy "Allow public read access" + on site_pages + for select + to public + using (true); \ No newline at end of file diff --git a/streamlit_ui.py b/streamlit_ui.py new file mode 100644 index 0000000..436282a --- /dev/null +++ b/streamlit_ui.py @@ -0,0 +1,114 @@ +from __future__ import annotations +from typing import Literal, TypedDict +from langgraph.types import Command +from openai import AsyncOpenAI +from supabase import Client +import streamlit as st +import logfire +import asyncio +import json +import uuid +import os + +# Import all the message part classes +from pydantic_ai.messages import ( + ModelMessage, + ModelRequest, + ModelResponse, + SystemPromptPart, + UserPromptPart, + TextPart, + ToolCallPart, + ToolReturnPart, + RetryPromptPart, + ModelMessagesTypeAdapter +) + +from archon_graph import agentic_flow + +# Load environment variables +from dotenv import load_dotenv +load_dotenv() + +openai_client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY")) +supabase: Client = Client( + os.getenv("SUPABASE_URL"), + os.getenv("SUPABASE_SERVICE_KEY") +) + +# Configure logfire to suppress warnings (optional) +logfire.configure(send_to_logfire='never') + +@st.cache_resource +def get_thread_id(): + return str(uuid.uuid4()) + +thread_id = get_thread_id() + +async def run_agent_with_streaming(user_input: str): + """ + Run the agent with streaming text for the user_input prompt, + while maintaining the entire conversation in `st.session_state.messages`. + """ + config = { + "configurable": { + "thread_id": thread_id + } + } + + # First message from user + if len(st.session_state.messages) == 1: + async for msg in agentic_flow.astream( + {"latest_user_message": user_input}, config, stream_mode="custom" + ): + yield msg + # Continue the conversation + else: + async for msg in agentic_flow.astream( + Command(resume=user_input), config, stream_mode="custom" + ): + yield msg + + +async def main(): + st.title("Archon - Agent Builder") + st.write("Describe to me an AI agent you want to build and I'll code it for you with Pydantic AI.") + st.write("Example: Build me an AI agent that can search the web with the Brave API.") + + # Initialize chat history in session state if not present + if "messages" not in st.session_state: + st.session_state.messages = [] + + # Display chat messages from history on app rerun + for message in st.session_state.messages: + message_type = message["type"] + if message_type in ["human", "ai", "system"]: + with st.chat_message(message_type): + st.markdown(message["content"]) + + # Chat input for the user + user_input = st.chat_input("What do you want to build today?") + + if user_input: + # We append a new request to the conversation explicitly + st.session_state.messages.append({"type": "human", "content": user_input}) + + # Display user prompt in the UI + with st.chat_message("user"): + st.markdown(user_input) + + # Display assistant response in chat message container + response_content = "" + with st.chat_message("assistant"): + message_placeholder = st.empty() # Placeholder for updating the message + # Run the async generator to fetch responses + async for chunk in run_agent_with_streaming(user_input): + response_content += chunk + # Update the placeholder with the current response content + message_placeholder.markdown(response_content) + + st.session_state.messages.append({"type": "ai", "content": response_content}) + + +if __name__ == "__main__": + asyncio.run(main())