First two iterations of Archon

This commit is contained in:
Cole Medin 2025-02-07 15:04:02 -06:00
parent c87bf34360
commit e2805b8757
28 changed files with 2887 additions and 0 deletions

33
.env.example Normal file
View File

@ -0,0 +1,33 @@
# Base URL for the OpenAI instance (default is https://api.openai.com/v1)
# OpenAI: https://api.openai.com/v1
# Ollama (example): http://localhost:11434/v1
# OpenRouter: https://openrouter.ai/api/v1
BASE_URL=
# Get your Open AI API Key by following these instructions -
# https://help.openai.com/en/articles/4936850-where-do-i-find-my-openai-api-key
# Even if using OpenRouter/Ollama, you still need to set this for the embedding model.
# Future versions of Archon will be more flexible with this.
OPENAI_API_KEY=
# For OpenAI: https://help.openai.com/en/articles/4936850-where-do-i-find-my-openai-api-key
# For OpenRouter: https://openrouter.ai/keys
LLM_API_KEY=
# For the Supabase version (sample_supabase_agent.py), set your Supabase URL and Service Key.
# Get your SUPABASE_URL from the API section of your Supabase project settings -
# https://supabase.com/dashboard/project/<your project ID>/settings/api
SUPABASE_URL=
# Get your SUPABASE_SERVICE_KEY from the API section of your Supabase project settings -
# https://supabase.com/dashboard/project/<your project ID>/settings/api
# On this page it is called the service_role secret.
SUPABASE_SERVICE_KEY=
# The LLM you want to use for the reasoner (o3-mini, R1, QwQ, etc.).
# Example: o3-mini
REASONER_MODEL=
# The LLM you want to use for the primary agent/coder.
# Example: gpt-4o-mini
PRIMARY_MODEL=

8
.gitignore vendored Normal file
View File

@ -0,0 +1,8 @@
# Folders
workbench
__pycache__
venv
.langgraph_api
# Files
.env

21
LICENSE Normal file
View File

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2025 oTTomator and Archon contributors
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

150
README.md Normal file
View File

@ -0,0 +1,150 @@
# Archon - AI Agent Builder
<img src="public/Archon.png" alt="Archon Logo" />
<div align="center" style="margin-top: 20px;margin-bottom: 30px">
<h3>🚀 **CURRENT VERSION** 🚀</h3>
**[ V2 - Agentic Workflow ]**
*Using LangGraph + Pydantic AI for multi-agent orchestration and planning*
</div>
Archon is an AI meta-agent designed to autonomously build, refine, and optimize other AI agents.
It serves both as a practical tool for developers and as an educational framework demonstrating the evolution of agentic systems.
Archon will be developed in iterations, starting with just a simple Pydantic AI agent that can build other Pydantic AI agents,
all the way to a full agentic workflow using LangGraph that can build other AI agents with any framework.
Through its iterative development, Archon showcases the power of planning, feedback loops, and domain-specific knowledge in creating robust AI agents.
The current version of Archon is V2 as mentioned above - see [V2 Documentation](iterations/v2-agentic-workflow/README.md) for details.
## Vision
Archon demonstrates three key principles in modern AI development:
1. **Agentic Reasoning**: Planning, iterative feedback, and self-evaluation overcome the limitations of purely reactive systems
2. **Domain Knowledge Integration**: Seamless embedding of frameworks like Pydantic AI and LangGraph within autonomous workflows
3. **Scalable Architecture**: Modular design supporting maintainability, cost optimization, and ethical AI practices
## Project Evolution
### V1: Single-Agent Foundation
- Basic RAG-powered agent using Pydantic AI
- Supabase vector database for documentation storage
- Simple code generation without validation
- [Learn more about V1](iterations/v1-single-agent/README.md)
### V2: Current - Agentic Workflow (LangGraph)
- Multi-agent system with planning and execution separation
- Reasoning LLM (O3-mini/R1) for architecture planning
- LangGraph for workflow orchestration
- Support for local LLMs via Ollama
- [Learn more about V2](iterations/v2-agentic-workflow/README.md)
### Future Iterations
- V3: Self-Feedback Loop - Automated validation and error correction
- V4: Tool Library Integration - Pre-built external tool incorporation
- V5: Multi-Framework Support - Framework-agnostic agent generation
- V6: Autonomous Framework Learning - Self-updating framework adapters
### Future Integrations
- Docker
- LangSmith
- MCP
- Other frameworks besides Pydantic AI
- Other vector databases besides Supabase
## Getting Started with V2 (current version)
Since V2 is the current version of Archon, all the code for V2 is in both the `archon` and `archon/iterations/v2-agentic-workflow` directories.
### Prerequisites
- Python 3.11+
- Supabase account and database
- OpenAI/OpenRouter API key or Ollama for local LLMs
- Streamlit (for web interface)
### Installation
1. Clone the repository:
```bash
git clone https://github.com/coleam00/archon.git
cd archon
```
2. Install dependencies:
```bash
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
pip install -r requirements.txt
```
3. Configure environment:
- Rename `.env.example` to `.env`
- Edit `.env` with your settings:
```env
BASE_URL=https://api.openai.com/v1 for OpenAI, https://api.openrouter.ai/v1 for OpenRouter, or your Ollama URL
LLM_API_KEY=your_openai_or_openrouter_api_key
OPENAI_API_KEY=your_openai_api_key # Required for embeddings
SUPABASE_URL=your_supabase_url
SUPABASE_SERVICE_KEY=your_supabase_service_key
PRIMARY_MODEL=gpt-4o-mini # Main agent model
REASONER_MODEL=o3-mini # Planning model
```
### Quick Start
1. Set up the database:
- Execute `site_pages.sql` in your Supabase SQL Editor
- This creates tables and enables vector similarity search
2. Crawl documentation:
```bash
python crawl_pydantic_ai_docs.py
```
3. Launch the UI:
```bash
streamlit run streamlit_ui.py
```
Visit `http://localhost:8501` to start building AI agents!
## Architecture
### Current V2 Components
- `archon_graph.py`: LangGraph workflow and agent coordination
- `pydantic_ai_coder.py`: Main coding agent with RAG capabilities
- `crawl_pydantic_ai_docs.py`: Documentation processor
- `streamlit_ui.py`: Interactive web interface
- `site_pages.sql`: Database schema
### Database Schema
```sql
CREATE TABLE site_pages (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
url TEXT,
chunk_number INTEGER,
title TEXT,
summary TEXT,
content TEXT,
metadata JSONB,
embedding VECTOR(1536)
);
```
## Contributing
We welcome contributions! Whether you're fixing bugs, adding features, or improving documentation, please feel free to submit a Pull Request.
## License
[MIT License](LICENSE)
---
For version-specific details:
- [V1 Documentation](iterations/v1-single-agent/README.md)
- [V2 Documentation](iterations/v2-agentic-workflow/README.md)

201
archon_graph.py Normal file
View File

@ -0,0 +1,201 @@
from pydantic_ai.models.openai import OpenAIModel
from pydantic_ai import Agent, RunContext
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
from typing import TypedDict, Annotated, List, Any
from langgraph.config import get_stream_writer
from langgraph.types import interrupt
from dotenv import load_dotenv
from openai import AsyncOpenAI
from supabase import Client
import logfire
import os
# Import the message classes from Pydantic AI
from pydantic_ai.messages import (
ModelMessage,
ModelMessagesTypeAdapter
)
from pydantic_ai_coder import pydantic_ai_coder, PydanticAIDeps, list_documentation_pages_helper
# Load environment variables
load_dotenv()
# Configure logfire to suppress warnings (optional)
logfire.configure(send_to_logfire='never')
base_url = os.getenv('BASE_URL', 'https://api.openai.com/v1')
api_key = os.getenv('LLM_API_KEY', 'no-llm-api-key-provided')
is_ollama = "localhost" in base_url.lower()
reasoner_llm_model = os.getenv('REASONER_MODEL', 'o3-mini')
reasoner = Agent(
OpenAIModel(reasoner_llm_model, base_url=base_url, api_key=api_key),
system_prompt='You are an expert at coding AI agents with Pydantic AI and defining the scope for doing so.',
)
primary_llm_model = os.getenv('PRIMARY_MODEL', 'gpt-4o-mini')
router_agent = Agent(
OpenAIModel(primary_llm_model, base_url=base_url, api_key=api_key),
system_prompt='Your job is to route the user message either to the end of the conversation or to continue coding the AI agent.',
)
end_conversation_agent = Agent(
OpenAIModel(primary_llm_model, base_url=base_url, api_key=api_key),
system_prompt='Your job is to end a conversation for creating an AI agent by giving instructions for how to execute the agent and they saying a nice goodbye to the user.',
)
openai_client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))
supabase: Client = Client(
os.getenv("SUPABASE_URL"),
os.getenv("SUPABASE_SERVICE_KEY")
)
# Define state schema
class AgentState(TypedDict):
latest_user_message: str
messages: Annotated[List[bytes], lambda x, y: x + y]
scope: str
# Scope Definition Node with Reasoner LLM
async def define_scope_with_reasoner(state: AgentState):
# First, get the documentation pages so the reasoner can decide which ones are necessary
documentation_pages = await list_documentation_pages_helper(supabase)
documentation_pages_str = "\n".join(documentation_pages)
# Then, use the reasoner to define the scope
prompt = f"""
User AI Agent Request: {state['latest_user_message']}
Create detailed scope document for the AI agent including:
- Architecture diagram
- Core components
- External dependencies
- Testing strategy
Also based on these documentation pages available:
{documentation_pages_str}
Include a list of documentation pages that are relevant to creating this agent for the user in the scope document.
"""
result = await reasoner.run(prompt)
scope = result.data
# Save the scope to a file
scope_path = os.path.join("workbench", "scope.md")
os.makedirs("workbench", exist_ok=True)
with open(scope_path, "w", encoding="utf-8") as f:
f.write(scope)
return {"scope": scope}
# Coding Node with Feedback Handling
async def coder_agent(state: AgentState, writer):
# Prepare dependencies
deps = PydanticAIDeps(
supabase=supabase,
openai_client=openai_client,
reasoner_output=state['scope']
)
# Get the message history into the format for Pydantic AI
message_history: list[ModelMessage] = []
for message_row in state['messages']:
message_history.extend(ModelMessagesTypeAdapter.validate_json(message_row))
# Run the agent in a stream
if is_ollama:
writer = get_stream_writer()
result = await pydantic_ai_coder.run(state['latest_user_message'], deps=deps, message_history= message_history)
writer(result.data)
else:
async with pydantic_ai_coder.run_stream(
state['latest_user_message'],
deps=deps,
message_history= message_history
) as result:
# Stream partial text as it arrives
async for chunk in result.stream_text(delta=True):
writer(chunk)
# print(ModelMessagesTypeAdapter.validate_json(result.new_messages_json()))
return {"messages": [result.new_messages_json()]}
# Interrupt the graph to get the user's next message
def get_next_user_message(state: AgentState):
value = interrupt({})
# Set the user's latest message for the LLM to continue the conversation
return {
"latest_user_message": value
}
# Determine if the user is finished creating their AI agent or not
async def route_user_message(state: AgentState):
prompt = f"""
The user has sent a message:
{state['latest_user_message']}
If the user wants to end the conversation, respond with just the text "finish_conversation".
If the user wants to continue coding the AI agent, respond with just the text "coder_agent".
"""
result = await router_agent.run(prompt)
next_action = result.data
if next_action == "finish_conversation":
return "finish_conversation"
else:
return "coder_agent"
# End of conversation agent to give instructions for executing the agent
async def finish_conversation(state: AgentState, writer):
# Get the message history into the format for Pydantic AI
message_history: list[ModelMessage] = []
for message_row in state['messages']:
message_history.extend(ModelMessagesTypeAdapter.validate_json(message_row))
# Run the agent in a stream
if is_ollama:
writer = get_stream_writer()
result = await end_conversation_agent.run(state['latest_user_message'], message_history= message_history)
writer(result.data)
else:
async with end_conversation_agent.run_stream(
state['latest_user_message'],
message_history= message_history
) as result:
# Stream partial text as it arrives
async for chunk in result.stream_text(delta=True):
writer(chunk)
return {"messages": [result.new_messages_json()]}
# Build workflow
builder = StateGraph(AgentState)
# Add nodes
builder.add_node("define_scope_with_reasoner", define_scope_with_reasoner)
builder.add_node("coder_agent", coder_agent)
builder.add_node("get_next_user_message", get_next_user_message)
builder.add_node("finish_conversation", finish_conversation)
# Set edges
builder.add_edge(START, "define_scope_with_reasoner")
builder.add_edge("define_scope_with_reasoner", "coder_agent")
builder.add_edge("coder_agent", "get_next_user_message")
builder.add_conditional_edges(
"get_next_user_message",
route_user_message,
{"coder_agent": "coder_agent", "finish_conversation": "finish_conversation"}
)
builder.add_edge("finish_conversation", END)
# Configure persistence
memory = MemorySaver()
agentic_flow = builder.compile(checkpointer=memory)

245
crawl_pydantic_ai_docs.py Normal file
View File

@ -0,0 +1,245 @@
import os
import sys
import json
import asyncio
import requests
from xml.etree import ElementTree
from typing import List, Dict, Any
from dataclasses import dataclass
from datetime import datetime, timezone
from urllib.parse import urlparse
from dotenv import load_dotenv
from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode
from openai import AsyncOpenAI
from supabase import create_client, Client
load_dotenv()
# Initialize OpenAI and Supabase clients
openai_client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))
supabase: Client = create_client(
os.getenv("SUPABASE_URL"),
os.getenv("SUPABASE_SERVICE_KEY")
)
@dataclass
class ProcessedChunk:
url: str
chunk_number: int
title: str
summary: str
content: str
metadata: Dict[str, Any]
embedding: List[float]
def chunk_text(text: str, chunk_size: int = 5000) -> List[str]:
"""Split text into chunks, respecting code blocks and paragraphs."""
chunks = []
start = 0
text_length = len(text)
while start < text_length:
# Calculate end position
end = start + chunk_size
# If we're at the end of the text, just take what's left
if end >= text_length:
chunks.append(text[start:].strip())
break
# Try to find a code block boundary first (```)
chunk = text[start:end]
code_block = chunk.rfind('```')
if code_block != -1 and code_block > chunk_size * 0.3:
end = start + code_block
# If no code block, try to break at a paragraph
elif '\n\n' in chunk:
# Find the last paragraph break
last_break = chunk.rfind('\n\n')
if last_break > chunk_size * 0.3: # Only break if we're past 30% of chunk_size
end = start + last_break
# If no paragraph break, try to break at a sentence
elif '. ' in chunk:
# Find the last sentence break
last_period = chunk.rfind('. ')
if last_period > chunk_size * 0.3: # Only break if we're past 30% of chunk_size
end = start + last_period + 1
# Extract chunk and clean it up
chunk = text[start:end].strip()
if chunk:
chunks.append(chunk)
# Move start position for next chunk
start = max(start + 1, end)
return chunks
async def get_title_and_summary(chunk: str, url: str) -> Dict[str, str]:
"""Extract title and summary using GPT-4."""
system_prompt = """You are an AI that extracts titles and summaries from documentation chunks.
Return a JSON object with 'title' and 'summary' keys.
For the title: If this seems like the start of a document, extract its title. If it's a middle chunk, derive a descriptive title.
For the summary: Create a concise summary of the main points in this chunk.
Keep both title and summary concise but informative."""
try:
response = await openai_client.chat.completions.create(
model=os.getenv("LLM_MODEL", "gpt-4o-mini"),
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": f"URL: {url}\n\nContent:\n{chunk[:1000]}..."} # Send first 1000 chars for context
],
response_format={ "type": "json_object" }
)
return json.loads(response.choices[0].message.content)
except Exception as e:
print(f"Error getting title and summary: {e}")
return {"title": "Error processing title", "summary": "Error processing summary"}
async def get_embedding(text: str) -> List[float]:
"""Get embedding vector from OpenAI."""
try:
response = await openai_client.embeddings.create(
model="text-embedding-3-small",
input=text
)
return response.data[0].embedding
except Exception as e:
print(f"Error getting embedding: {e}")
return [0] * 1536 # Return zero vector on error
async def process_chunk(chunk: str, chunk_number: int, url: str) -> ProcessedChunk:
"""Process a single chunk of text."""
# Get title and summary
extracted = await get_title_and_summary(chunk, url)
# Get embedding
embedding = await get_embedding(chunk)
# Create metadata
metadata = {
"source": "pydantic_ai_docs",
"chunk_size": len(chunk),
"crawled_at": datetime.now(timezone.utc).isoformat(),
"url_path": urlparse(url).path
}
return ProcessedChunk(
url=url,
chunk_number=chunk_number,
title=extracted['title'],
summary=extracted['summary'],
content=chunk, # Store the original chunk content
metadata=metadata,
embedding=embedding
)
async def insert_chunk(chunk: ProcessedChunk):
"""Insert a processed chunk into Supabase."""
try:
data = {
"url": chunk.url,
"chunk_number": chunk.chunk_number,
"title": chunk.title,
"summary": chunk.summary,
"content": chunk.content,
"metadata": chunk.metadata,
"embedding": chunk.embedding
}
result = supabase.table("site_pages").insert(data).execute()
print(f"Inserted chunk {chunk.chunk_number} for {chunk.url}")
return result
except Exception as e:
print(f"Error inserting chunk: {e}")
return None
async def process_and_store_document(url: str, markdown: str):
"""Process a document and store its chunks in parallel."""
# Split into chunks
chunks = chunk_text(markdown)
# Process chunks in parallel
tasks = [
process_chunk(chunk, i, url)
for i, chunk in enumerate(chunks)
]
processed_chunks = await asyncio.gather(*tasks)
# Store chunks in parallel
insert_tasks = [
insert_chunk(chunk)
for chunk in processed_chunks
]
await asyncio.gather(*insert_tasks)
async def crawl_parallel(urls: List[str], max_concurrent: int = 5):
"""Crawl multiple URLs in parallel with a concurrency limit."""
browser_config = BrowserConfig(
headless=True,
verbose=False,
extra_args=["--disable-gpu", "--disable-dev-shm-usage", "--no-sandbox"],
)
crawl_config = CrawlerRunConfig(cache_mode=CacheMode.BYPASS)
# Create the crawler instance
crawler = AsyncWebCrawler(config=browser_config)
await crawler.start()
try:
# Create a semaphore to limit concurrency
semaphore = asyncio.Semaphore(max_concurrent)
async def process_url(url: str):
async with semaphore:
result = await crawler.arun(
url=url,
config=crawl_config,
session_id="session1"
)
if result.success:
print(f"Successfully crawled: {url}")
await process_and_store_document(url, result.markdown_v2.raw_markdown)
else:
print(f"Failed: {url} - Error: {result.error_message}")
# Process all URLs in parallel with limited concurrency
await asyncio.gather(*[process_url(url) for url in urls])
finally:
await crawler.close()
def get_pydantic_ai_docs_urls() -> List[str]:
"""Get URLs from Pydantic AI docs sitemap."""
sitemap_url = "https://ai.pydantic.dev/sitemap.xml"
try:
response = requests.get(sitemap_url)
response.raise_for_status()
# Parse the XML
root = ElementTree.fromstring(response.content)
# Extract all URLs from the sitemap
namespace = {'ns': 'http://www.sitemaps.org/schemas/sitemap/0.9'}
urls = [loc.text for loc in root.findall('.//ns:loc', namespace)]
return urls
except Exception as e:
print(f"Error fetching sitemap: {e}")
return []
async def main():
# Get URLs from Pydantic AI docs
urls = get_pydantic_ai_docs_urls()
if not urls:
print("No URLs found to crawl")
return
print(f"Found {len(urls)} URLs to crawl")
await crawl_parallel(urls)
if __name__ == "__main__":
asyncio.run(main())

View File

@ -0,0 +1,19 @@
# Get your Open AI API Key by following these instructions -
# https://help.openai.com/en/articles/4936850-where-do-i-find-my-openai-api-key
# You only need this environment variable set if you are using GPT (and not Ollama)
OPENAI_API_KEY=
# For the Supabase version (sample_supabase_agent.py), set your Supabase URL and Service Key.
# Get your SUPABASE_URL from the API section of your Supabase project settings -
# https://supabase.com/dashboard/project/<your project ID>/settings/api
SUPABASE_URL=
# Get your SUPABASE_SERVICE_KEY from the API section of your Supabase project settings -
# https://supabase.com/dashboard/project/<your project ID>/settings/api
# On this page it is called the service_role secret.
SUPABASE_SERVICE_KEY=
# The LLM you want to use from OpenAI. See the list of models here:
# https://platform.openai.com/docs/models
# Example: gpt-4o-mini
LLM_MODEL=

View File

@ -0,0 +1,122 @@
# Archon V1 - Basic Pydantic AI Agent to Build other Pydantic AI Agents
This is the first iteration of the Archon project - no use of LangGraph and built with a single AI agent to keep things very simple and introductory.
An intelligent documentation crawler and RAG (Retrieval-Augmented Generation) agent built using Pydantic AI and Supabase that is capable of building other Pydantic AI agents. The agent crawls the Pydantic AI documentation, stores content in a vector database, and provides Pydantic AI agent code by retrieving and analyzing relevant documentation chunks.
## Features
- Pydantic AI documentation crawling and chunking
- Vector database storage with Supabase
- Semantic search using OpenAI embeddings
- RAG-based question answering
- Support for code block preservation
- Streamlit UI for interactive querying
## Prerequisites
- Python 3.11+
- Supabase account and database
- OpenAI API key
- Streamlit (for web interface)
## Installation
1. Clone the repository:
```bash
git clone https://github.com/coleam00/archon.git
cd archon/iterations/v1-single-agent
```
2. Install dependencies (recommended to use a Python virtual environment):
```bash
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
pip install -r requirements.txt
```
3. Set up environment variables:
- Rename `.env.example` to `.env`
- Edit `.env` with your API keys and preferences:
```env
OPENAI_API_KEY=your_openai_api_key
SUPABASE_URL=your_supabase_url
SUPABASE_SERVICE_KEY=your_supabase_service_key
LLM_MODEL=gpt-4o-mini # or your preferred OpenAI model
```
## Usage
### Database Setup
Execute the SQL commands in `site_pages.sql` to:
1. Create the necessary tables
2. Enable vector similarity search
3. Set up Row Level Security policies
In Supabase, do this by going to the "SQL Editor" tab and pasting in the SQL into the editor there. Then click "Run".
### Crawl Documentation
To crawl and store documentation in the vector database:
```bash
python crawl_pydantic_ai_docs.py
```
This will:
1. Fetch URLs from the documentation sitemap
2. Crawl each page and split into chunks
3. Generate embeddings and store in Supabase
### Streamlit Web Interface
For an interactive web interface to query the documentation:
```bash
streamlit run streamlit_ui.py
```
The interface will be available at `http://localhost:8501`
## Configuration
### Database Schema
The Supabase database uses the following schema:
```sql
CREATE TABLE site_pages (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
url TEXT,
chunk_number INTEGER,
title TEXT,
summary TEXT,
content TEXT,
metadata JSONB,
embedding VECTOR(1536)
);
```
### Chunking Configuration
You can configure chunking parameters in `crawl_pydantic_ai_docs.py`:
```python
chunk_size = 5000 # Characters per chunk
```
The chunker intelligently preserves:
- Code blocks
- Paragraph boundaries
- Sentence boundaries
## Project Structure
- `crawl_pydantic_ai_docs.py`: Documentation crawler and processor
- `pydantic_ai_expert.py`: RAG agent implementation
- `streamlit_ui.py`: Web interface
- `site_pages.sql`: Database setup commands
- `requirements.txt`: Project dependencies
## Contributing
Contributions are welcome! Please feel free to submit a Pull Request.

View File

@ -0,0 +1,245 @@
import os
import sys
import json
import asyncio
import requests
from xml.etree import ElementTree
from typing import List, Dict, Any
from dataclasses import dataclass
from datetime import datetime, timezone
from urllib.parse import urlparse
from dotenv import load_dotenv
from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode
from openai import AsyncOpenAI
from supabase import create_client, Client
load_dotenv()
# Initialize OpenAI and Supabase clients
openai_client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))
supabase: Client = create_client(
os.getenv("SUPABASE_URL"),
os.getenv("SUPABASE_SERVICE_KEY")
)
@dataclass
class ProcessedChunk:
url: str
chunk_number: int
title: str
summary: str
content: str
metadata: Dict[str, Any]
embedding: List[float]
def chunk_text(text: str, chunk_size: int = 5000) -> List[str]:
"""Split text into chunks, respecting code blocks and paragraphs."""
chunks = []
start = 0
text_length = len(text)
while start < text_length:
# Calculate end position
end = start + chunk_size
# If we're at the end of the text, just take what's left
if end >= text_length:
chunks.append(text[start:].strip())
break
# Try to find a code block boundary first (```)
chunk = text[start:end]
code_block = chunk.rfind('```')
if code_block != -1 and code_block > chunk_size * 0.3:
end = start + code_block
# If no code block, try to break at a paragraph
elif '\n\n' in chunk:
# Find the last paragraph break
last_break = chunk.rfind('\n\n')
if last_break > chunk_size * 0.3: # Only break if we're past 30% of chunk_size
end = start + last_break
# If no paragraph break, try to break at a sentence
elif '. ' in chunk:
# Find the last sentence break
last_period = chunk.rfind('. ')
if last_period > chunk_size * 0.3: # Only break if we're past 30% of chunk_size
end = start + last_period + 1
# Extract chunk and clean it up
chunk = text[start:end].strip()
if chunk:
chunks.append(chunk)
# Move start position for next chunk
start = max(start + 1, end)
return chunks
async def get_title_and_summary(chunk: str, url: str) -> Dict[str, str]:
"""Extract title and summary using GPT-4."""
system_prompt = """You are an AI that extracts titles and summaries from documentation chunks.
Return a JSON object with 'title' and 'summary' keys.
For the title: If this seems like the start of a document, extract its title. If it's a middle chunk, derive a descriptive title.
For the summary: Create a concise summary of the main points in this chunk.
Keep both title and summary concise but informative."""
try:
response = await openai_client.chat.completions.create(
model=os.getenv("LLM_MODEL", "gpt-4o-mini"),
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": f"URL: {url}\n\nContent:\n{chunk[:1000]}..."} # Send first 1000 chars for context
],
response_format={ "type": "json_object" }
)
return json.loads(response.choices[0].message.content)
except Exception as e:
print(f"Error getting title and summary: {e}")
return {"title": "Error processing title", "summary": "Error processing summary"}
async def get_embedding(text: str) -> List[float]:
"""Get embedding vector from OpenAI."""
try:
response = await openai_client.embeddings.create(
model="text-embedding-3-small",
input=text
)
return response.data[0].embedding
except Exception as e:
print(f"Error getting embedding: {e}")
return [0] * 1536 # Return zero vector on error
async def process_chunk(chunk: str, chunk_number: int, url: str) -> ProcessedChunk:
"""Process a single chunk of text."""
# Get title and summary
extracted = await get_title_and_summary(chunk, url)
# Get embedding
embedding = await get_embedding(chunk)
# Create metadata
metadata = {
"source": "pydantic_ai_docs",
"chunk_size": len(chunk),
"crawled_at": datetime.now(timezone.utc).isoformat(),
"url_path": urlparse(url).path
}
return ProcessedChunk(
url=url,
chunk_number=chunk_number,
title=extracted['title'],
summary=extracted['summary'],
content=chunk, # Store the original chunk content
metadata=metadata,
embedding=embedding
)
async def insert_chunk(chunk: ProcessedChunk):
"""Insert a processed chunk into Supabase."""
try:
data = {
"url": chunk.url,
"chunk_number": chunk.chunk_number,
"title": chunk.title,
"summary": chunk.summary,
"content": chunk.content,
"metadata": chunk.metadata,
"embedding": chunk.embedding
}
result = supabase.table("site_pages").insert(data).execute()
print(f"Inserted chunk {chunk.chunk_number} for {chunk.url}")
return result
except Exception as e:
print(f"Error inserting chunk: {e}")
return None
async def process_and_store_document(url: str, markdown: str):
"""Process a document and store its chunks in parallel."""
# Split into chunks
chunks = chunk_text(markdown)
# Process chunks in parallel
tasks = [
process_chunk(chunk, i, url)
for i, chunk in enumerate(chunks)
]
processed_chunks = await asyncio.gather(*tasks)
# Store chunks in parallel
insert_tasks = [
insert_chunk(chunk)
for chunk in processed_chunks
]
await asyncio.gather(*insert_tasks)
async def crawl_parallel(urls: List[str], max_concurrent: int = 5):
"""Crawl multiple URLs in parallel with a concurrency limit."""
browser_config = BrowserConfig(
headless=True,
verbose=False,
extra_args=["--disable-gpu", "--disable-dev-shm-usage", "--no-sandbox"],
)
crawl_config = CrawlerRunConfig(cache_mode=CacheMode.BYPASS)
# Create the crawler instance
crawler = AsyncWebCrawler(config=browser_config)
await crawler.start()
try:
# Create a semaphore to limit concurrency
semaphore = asyncio.Semaphore(max_concurrent)
async def process_url(url: str):
async with semaphore:
result = await crawler.arun(
url=url,
config=crawl_config,
session_id="session1"
)
if result.success:
print(f"Successfully crawled: {url}")
await process_and_store_document(url, result.markdown_v2.raw_markdown)
else:
print(f"Failed: {url} - Error: {result.error_message}")
# Process all URLs in parallel with limited concurrency
await asyncio.gather(*[process_url(url) for url in urls])
finally:
await crawler.close()
def get_pydantic_ai_docs_urls() -> List[str]:
"""Get URLs from Pydantic AI docs sitemap."""
sitemap_url = "https://ai.pydantic.dev/sitemap.xml"
try:
response = requests.get(sitemap_url)
response.raise_for_status()
# Parse the XML
root = ElementTree.fromstring(response.content)
# Extract all URLs from the sitemap
namespace = {'ns': 'http://www.sitemaps.org/schemas/sitemap/0.9'}
urls = [loc.text for loc in root.findall('.//ns:loc', namespace)]
return urls
except Exception as e:
print(f"Error fetching sitemap: {e}")
return []
async def main():
# Get URLs from Pydantic AI docs
urls = get_pydantic_ai_docs_urls()
if not urls:
print("No URLs found to crawl")
return
print(f"Found {len(urls)} URLs to crawl")
await crawl_parallel(urls)
if __name__ == "__main__":
asyncio.run(main())

View File

@ -0,0 +1,193 @@
from __future__ import annotations as _annotations
from dataclasses import dataclass
from dotenv import load_dotenv
import logfire
import asyncio
import httpx
import os
from pydantic_ai import Agent, ModelRetry, RunContext
from pydantic_ai.models.openai import OpenAIModel
from openai import AsyncOpenAI
from supabase import Client
from typing import List
load_dotenv()
llm = os.getenv('LLM_MODEL', 'gpt-4o-mini')
model = OpenAIModel(llm)
logfire.configure(send_to_logfire='if-token-present')
@dataclass
class PydanticAIDeps:
supabase: Client
openai_client: AsyncOpenAI
system_prompt = """
~~ CONTEXT: ~~
You are an expert at Pydantic AI - a Python AI agent framework that you have access to all the documentation to,
including examples, an API reference, and other resources to help you build Pydantic AI agents.
~~ GOAL: ~~
Your only job is to help the user create an AI agent with Pydantic AI.
The user will describe the AI agent they want to build, or if they don't, guide them towards doing so.
You will take their requirements, and then search through the Pydantic AI documentation with the tools provided
to find all the necessary information to create the AI agent with correct code.
It's important for you to search through multiple Pydantic AI documentation pages to get all the information you need.
Almost never stick to just one page - use RAG and the other documentation tools multiple times when you are creating
an AI agent from scratch for the user.
~~ STRUCTURE: ~~
When you build an AI agent from scratch, split the agent into this files and give the code for each:
- `agent.py`: The main agent file, which is where the Pydantic AI agent is defined.
- `agent_tools.py`: A tools file for the agent, which is where all the tool functions are defined. Use this for more complex agents.
- `agent_prompts.py`: A prompts file for the agent, which includes all system prompts and other prompts used by the agent. Use this when there are many prompts or large ones.
- `.env.example`: An example `.env` file - specify each variable that the user will need to fill in and a quick comment above each one for how to do so.
- `requirements.txt`: Don't include any versions, just the top level package names needed for the agent.
~~ INSTRUCTIONS: ~~
- Don't ask the user before taking an action, just do it. Always make sure you look at the documentation with the provided tools before writing any code.
- When you first look at the documentation, always start with RAG.
Then also always check the list of available documentation pages and retrieve the content of page(s) if it'll help.
- Always let the user know when you didn't find the answer in the documentation or the right URL - be honest.
- Helpful tip: when starting a new AI agent build, it's a good idea to look at the 'weather agent' in the docs as an example.
- When starting a new AI agent build, always produce the full code for the AI agent - never tell the user to finish a tool/function.
- When refining an existing AI agent build in a conversation, just share the code changes necessary.
"""
pydantic_ai_coder = Agent(
model,
system_prompt=system_prompt,
deps_type=PydanticAIDeps,
retries=2
)
async def get_embedding(text: str, openai_client: AsyncOpenAI) -> List[float]:
"""Get embedding vector from OpenAI."""
try:
response = await openai_client.embeddings.create(
model="text-embedding-3-small",
input=text
)
return response.data[0].embedding
except Exception as e:
print(f"Error getting embedding: {e}")
return [0] * 1536 # Return zero vector on error
@pydantic_ai_coder.tool
async def retrieve_relevant_documentation(ctx: RunContext[PydanticAIDeps], user_query: str) -> str:
"""
Retrieve relevant documentation chunks based on the query with RAG.
Args:
ctx: The context including the Supabase client and OpenAI client
user_query: The user's question or query
Returns:
A formatted string containing the top 5 most relevant documentation chunks
"""
try:
# Get the embedding for the query
query_embedding = await get_embedding(user_query, ctx.deps.openai_client)
# Query Supabase for relevant documents
result = ctx.deps.supabase.rpc(
'match_site_pages',
{
'query_embedding': query_embedding,
'match_count': 5,
'filter': {'source': 'pydantic_ai_docs'}
}
).execute()
if not result.data:
return "No relevant documentation found."
# Format the results
formatted_chunks = []
for doc in result.data:
chunk_text = f"""
# {doc['title']}
{doc['content']}
"""
formatted_chunks.append(chunk_text)
# Join all chunks with a separator
return "\n\n---\n\n".join(formatted_chunks)
except Exception as e:
print(f"Error retrieving documentation: {e}")
return f"Error retrieving documentation: {str(e)}"
@pydantic_ai_coder.tool
async def list_documentation_pages(ctx: RunContext[PydanticAIDeps]) -> List[str]:
"""
Retrieve a list of all available Pydantic AI documentation pages.
Returns:
List[str]: List of unique URLs for all documentation pages
"""
try:
# Query Supabase for unique URLs where source is pydantic_ai_docs
result = ctx.deps.supabase.from_('site_pages') \
.select('url') \
.eq('metadata->>source', 'pydantic_ai_docs') \
.execute()
if not result.data:
return []
# Extract unique URLs
urls = sorted(set(doc['url'] for doc in result.data))
return urls
except Exception as e:
print(f"Error retrieving documentation pages: {e}")
return []
@pydantic_ai_coder.tool
async def get_page_content(ctx: RunContext[PydanticAIDeps], url: str) -> str:
"""
Retrieve the full content of a specific documentation page by combining all its chunks.
Args:
ctx: The context including the Supabase client
url: The URL of the page to retrieve
Returns:
str: The complete page content with all chunks combined in order
"""
try:
# Query Supabase for all chunks of this URL, ordered by chunk_number
result = ctx.deps.supabase.from_('site_pages') \
.select('title, content, chunk_number') \
.eq('url', url) \
.eq('metadata->>source', 'pydantic_ai_docs') \
.order('chunk_number') \
.execute()
if not result.data:
return f"No content found for URL: {url}"
# Format the page with its title and all chunks
page_title = result.data[0]['title'].split(' - ')[0] # Get the main title
formatted_content = [f"# {page_title}\n"]
# Add each chunk's content
for chunk in result.data:
formatted_content.append(chunk['content'])
# Join everything together
return "\n\n".join(formatted_content)
except Exception as e:
print(f"Error retrieving page content: {e}")
return f"Error retrieving page content: {str(e)}"

Binary file not shown.

View File

@ -0,0 +1,72 @@
-- Enable the pgvector extension
create extension if not exists vector;
-- Create the documentation chunks table
create table site_pages (
id bigserial primary key,
url varchar not null,
chunk_number integer not null,
title varchar not null,
summary varchar not null,
content text not null, -- Added content column
metadata jsonb not null default '{}'::jsonb, -- Added metadata column
embedding vector(1536), -- OpenAI embeddings are 1536 dimensions
created_at timestamp with time zone default timezone('utc'::text, now()) not null,
-- Add a unique constraint to prevent duplicate chunks for the same URL
unique(url, chunk_number)
);
-- Create an index for better vector similarity search performance
create index on site_pages using ivfflat (embedding vector_cosine_ops);
-- Create an index on metadata for faster filtering
create index idx_site_pages_metadata on site_pages using gin (metadata);
-- Create a function to search for documentation chunks
create function match_site_pages (
query_embedding vector(1536),
match_count int default 10,
filter jsonb DEFAULT '{}'::jsonb
) returns table (
id bigint,
url varchar,
chunk_number integer,
title varchar,
summary varchar,
content text,
metadata jsonb,
similarity float
)
language plpgsql
as $$
#variable_conflict use_column
begin
return query
select
id,
url,
chunk_number,
title,
summary,
content,
metadata,
1 - (site_pages.embedding <=> query_embedding) as similarity
from site_pages
where metadata @> filter
order by site_pages.embedding <=> query_embedding
limit match_count;
end;
$$;
-- Everything above will work for any PostgreSQL database. The below commands are for Supabase security
-- Enable RLS on the table
alter table site_pages enable row level security;
-- Create a policy that allows anyone to read
create policy "Allow public read access"
on site_pages
for select
to public
using (true);

View File

@ -0,0 +1,143 @@
from __future__ import annotations
from typing import Literal, TypedDict
import asyncio
import os
import streamlit as st
import json
import logfire
from supabase import Client
from openai import AsyncOpenAI
# Import all the message part classes
from pydantic_ai.messages import (
ModelMessage,
ModelRequest,
ModelResponse,
SystemPromptPart,
UserPromptPart,
TextPart,
ToolCallPart,
ToolReturnPart,
RetryPromptPart,
ModelMessagesTypeAdapter
)
from pydantic_ai_coder import pydantic_ai_coder, PydanticAIDeps
# Load environment variables
from dotenv import load_dotenv
load_dotenv()
openai_client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))
supabase: Client = Client(
os.getenv("SUPABASE_URL"),
os.getenv("SUPABASE_SERVICE_KEY")
)
# Configure logfire to suppress warnings (optional)
logfire.configure(send_to_logfire='never')
class ChatMessage(TypedDict):
"""Format of messages sent to the browser/API."""
role: Literal['user', 'model']
timestamp: str
content: str
def display_message_part(part):
"""
Display a single part of a message in the Streamlit UI.
Customize how you display system prompts, user prompts,
tool calls, tool returns, etc.
"""
# system-prompt
if part.part_kind == 'system-prompt':
with st.chat_message("system"):
st.markdown(f"**System**: {part.content}")
# user-prompt
elif part.part_kind == 'user-prompt':
with st.chat_message("user"):
st.markdown(part.content)
# text
elif part.part_kind == 'text':
with st.chat_message("assistant"):
st.markdown(part.content)
async def run_agent_with_streaming(user_input: str):
"""
Run the agent with streaming text for the user_input prompt,
while maintaining the entire conversation in `st.session_state.messages`.
"""
# Prepare dependencies
deps = PydanticAIDeps(
supabase=supabase,
openai_client=openai_client
)
# Run the agent in a stream
async with pydantic_ai_coder.run_stream(
user_input,
deps=deps,
message_history= st.session_state.messages[:-1], # pass entire conversation so far
) as result:
# We'll gather partial text to show incrementally
partial_text = ""
message_placeholder = st.empty()
# Render partial text as it arrives
async for chunk in result.stream_text(delta=True):
partial_text += chunk
message_placeholder.markdown(partial_text)
# Now that the stream is finished, we have a final result.
# Add new messages from this run, excluding user-prompt messages
filtered_messages = [msg for msg in result.new_messages()
if not (hasattr(msg, 'parts') and
any(part.part_kind == 'user-prompt' for part in msg.parts))]
st.session_state.messages.extend(filtered_messages)
# Add the final response to the messages
st.session_state.messages.append(
ModelResponse(parts=[TextPart(content=partial_text)])
)
async def main():
st.title("Archon - Agent Builder")
st.write("Describe to me an AI agent you want to build and I'll code it for you with Pydantic AI.")
# Initialize chat history in session state if not present
if "messages" not in st.session_state:
st.session_state.messages = []
# Display all messages from the conversation so far
# Each message is either a ModelRequest or ModelResponse.
# We iterate over their parts to decide how to display them.
for msg in st.session_state.messages:
if isinstance(msg, ModelRequest) or isinstance(msg, ModelResponse):
for part in msg.parts:
display_message_part(part)
# Chat input for the user
user_input = st.chat_input("What do you want to build today?")
if user_input:
# We append a new request to the conversation explicitly
st.session_state.messages.append(
ModelRequest(parts=[UserPromptPart(content=user_input)])
)
# Display user prompt in the UI
with st.chat_message("user"):
st.markdown(user_input)
# Display the assistant's partial response while streaming
with st.chat_message("assistant"):
# Actually run the agent now, streaming the text
await run_agent_with_streaming(user_input)
if __name__ == "__main__":
asyncio.run(main())

View File

@ -0,0 +1,33 @@
# Base URL for the OpenAI instance (default is https://api.openai.com/v1)
# OpenAI: https://api.openai.com/v1
# Ollama (example): http://localhost:11434/v1
# OpenRouter: https://openrouter.ai/api/v1
BASE_URL=
# Get your Open AI API Key by following these instructions -
# https://help.openai.com/en/articles/4936850-where-do-i-find-my-openai-api-key
# Even if using OpenRouter/Ollama, you still need to set this for the embedding model.
# Future versions of Archon will be more flexible with this.
OPENAI_API_KEY=
# For OpenAI: https://help.openai.com/en/articles/4936850-where-do-i-find-my-openai-api-key
# For OpenRouter: https://openrouter.ai/keys
LLM_API_KEY=
# For the Supabase version (sample_supabase_agent.py), set your Supabase URL and Service Key.
# Get your SUPABASE_URL from the API section of your Supabase project settings -
# https://supabase.com/dashboard/project/<your project ID>/settings/api
SUPABASE_URL=
# Get your SUPABASE_SERVICE_KEY from the API section of your Supabase project settings -
# https://supabase.com/dashboard/project/<your project ID>/settings/api
# On this page it is called the service_role secret.
SUPABASE_SERVICE_KEY=
# The LLM you want to use for the reasoner (o3-mini, R1, QwQ, etc.).
# Example: o3-mini
REASONER_MODEL=
# The LLM you want to use for the primary agent/coder.
# Example: gpt-4o-mini
PRIMARY_MODEL=

View File

@ -0,0 +1,132 @@
# Archon V2 - Agentic Workflow for Building Pydantic AI Agents
This is the second iteration of the Archon project, building upon V1 by introducing LangGraph for a full agentic workflow. The system starts with a reasoning LLM (like O3-mini or R1) that analyzes user requirements and documentation to create a detailed scope, which then guides specialized coding and routing agents in generating high-quality Pydantic AI agents.
An intelligent documentation crawler and RAG (Retrieval-Augmented Generation) system built using Pydantic AI, LangGraph, and Supabase that is capable of building other Pydantic AI agents. The system crawls the Pydantic AI documentation, stores content in a vector database, and provides Pydantic AI agent code by retrieving and analyzing relevant documentation chunks.
This version also supports local LLMs with Ollama for the main agent and reasoning LLM.
Note that we are still relying on OpenAI for embeddings no matter what, but future versions of Archon will change that.
## Features
- Multi-agent workflow using LangGraph
- Specialized agents for reasoning, routing, and coding
- Pydantic AI documentation crawling and chunking
- Vector database storage with Supabase
- Semantic search using OpenAI embeddings
- RAG-based question answering
- Support for code block preservation
- Streamlit UI for interactive querying
## Prerequisites
- Python 3.11+
- Supabase account and database
- OpenAI/OpenRouter API key or Ollama for local LLMs
- Streamlit (for web interface)
## Installation
1. Clone the repository:
```bash
git clone https://github.com/coleam00/archon.git
cd archon/iterations/v2-agentic-workflow
```
2. Install dependencies (recommended to use a Python virtual environment):
```bash
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
pip install -r requirements.txt
```
3. Set up environment variables:
- Rename `.env.example` to `.env`
- Edit `.env` with your API keys and preferences:
```env
BASE_URL=https://api.openai.com/v1 for OpenAI, https://api.openrouter.ai/v1 for OpenRouter, or your Ollama URL
LLM_API_KEY=your_openai_or_openrouter_api_key
OPENAI_API_KEY=your_openai_api_key
SUPABASE_URL=your_supabase_url
SUPABASE_SERVICE_KEY=your_supabase_service_key
PRIMARY_MODEL=gpt-4o-mini # or your preferred OpenAI model for main agent
REASONER_MODEL=o3-mini # or your preferred OpenAI model for reasoning
```
## Usage
### Database Setup
Execute the SQL commands in `site_pages.sql` to:
1. Create the necessary tables
2. Enable vector similarity search
3. Set up Row Level Security policies
In Supabase, do this by going to the "SQL Editor" tab and pasting in the SQL into the editor there. Then click "Run".
### Crawl Documentation
To crawl and store documentation in the vector database:
```bash
python crawl_pydantic_ai_docs.py
```
This will:
1. Fetch URLs from the documentation sitemap
2. Crawl each page and split into chunks
3. Generate embeddings and store in Supabase
### Chunking Configuration
You can configure chunking parameters in `crawl_pydantic_ai_docs.py`:
```python
chunk_size = 5000 # Characters per chunk
```
The chunker intelligently preserves:
- Code blocks
- Paragraph boundaries
- Sentence boundaries
### Streamlit Web Interface
For an interactive web interface to query the documentation and create agents:
```bash
streamlit run streamlit_ui.py
```
The interface will be available at `http://localhost:8501`
## Configuration
### Database Schema
The Supabase database uses the following schema:
```sql
CREATE TABLE site_pages (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
url TEXT,
chunk_number INTEGER,
title TEXT,
summary TEXT,
content TEXT,
metadata JSONB,
embedding VECTOR(1536)
);
```
## Project Structure
- `archon_graph.py`: LangGraph workflow definition and agent coordination
- `pydantic_ai_coder.py`: Main coding agent with RAG capabilities
- `crawl_pydantic_ai_docs.py`: Documentation crawler and processor
- `streamlit_ui.py`: Web interface with streaming support
- `site_pages.sql`: Database setup commands
- `requirements.txt`: Project dependencies
## Contributing
Contributions are welcome! Please feel free to submit a Pull Request.

View File

@ -0,0 +1,201 @@
from pydantic_ai.models.openai import OpenAIModel
from pydantic_ai import Agent, RunContext
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
from typing import TypedDict, Annotated, List, Any
from langgraph.config import get_stream_writer
from langgraph.types import interrupt
from dotenv import load_dotenv
from openai import AsyncOpenAI
from supabase import Client
import logfire
import os
# Import the message classes from Pydantic AI
from pydantic_ai.messages import (
ModelMessage,
ModelMessagesTypeAdapter
)
from pydantic_ai_coder import pydantic_ai_coder, PydanticAIDeps, list_documentation_pages_helper
# Load environment variables
load_dotenv()
# Configure logfire to suppress warnings (optional)
logfire.configure(send_to_logfire='never')
base_url = os.getenv('BASE_URL', 'https://api.openai.com/v1')
api_key = os.getenv('LLM_API_KEY', 'no-llm-api-key-provided')
is_ollama = "localhost" in base_url.lower()
reasoner_llm_model = os.getenv('REASONER_MODEL', 'o3-mini')
reasoner = Agent(
OpenAIModel(reasoner_llm_model, base_url=base_url, api_key=api_key),
system_prompt='You are an expert at coding AI agents with Pydantic AI and defining the scope for doing so.',
)
primary_llm_model = os.getenv('PRIMARY_MODEL', 'gpt-4o-mini')
router_agent = Agent(
OpenAIModel(primary_llm_model, base_url=base_url, api_key=api_key),
system_prompt='Your job is to route the user message either to the end of the conversation or to continue coding the AI agent.',
)
end_conversation_agent = Agent(
OpenAIModel(primary_llm_model, base_url=base_url, api_key=api_key),
system_prompt='Your job is to end a conversation for creating an AI agent by giving instructions for how to execute the agent and they saying a nice goodbye to the user.',
)
openai_client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))
supabase: Client = Client(
os.getenv("SUPABASE_URL"),
os.getenv("SUPABASE_SERVICE_KEY")
)
# Define state schema
class AgentState(TypedDict):
latest_user_message: str
messages: Annotated[List[bytes], lambda x, y: x + y]
scope: str
# Scope Definition Node with Reasoner LLM
async def define_scope_with_reasoner(state: AgentState):
# First, get the documentation pages so the reasoner can decide which ones are necessary
documentation_pages = await list_documentation_pages_helper(supabase)
documentation_pages_str = "\n".join(documentation_pages)
# Then, use the reasoner to define the scope
prompt = f"""
User AI Agent Request: {state['latest_user_message']}
Create detailed scope document for the AI agent including:
- Architecture diagram
- Core components
- External dependencies
- Testing strategy
Also based on these documentation pages available:
{documentation_pages_str}
Include a list of documentation pages that are relevant to creating this agent for the user in the scope document.
"""
result = await reasoner.run(prompt)
scope = result.data
# Save the scope to a file
scope_path = os.path.join("workbench", "scope.md")
os.makedirs("workbench", exist_ok=True)
with open(scope_path, "w", encoding="utf-8") as f:
f.write(scope)
return {"scope": scope}
# Coding Node with Feedback Handling
async def coder_agent(state: AgentState, writer):
# Prepare dependencies
deps = PydanticAIDeps(
supabase=supabase,
openai_client=openai_client,
reasoner_output=state['scope']
)
# Get the message history into the format for Pydantic AI
message_history: list[ModelMessage] = []
for message_row in state['messages']:
message_history.extend(ModelMessagesTypeAdapter.validate_json(message_row))
# Run the agent in a stream
if is_ollama:
writer = get_stream_writer()
result = await pydantic_ai_coder.run(state['latest_user_message'], deps=deps, message_history= message_history)
writer(result.data)
else:
async with pydantic_ai_coder.run_stream(
state['latest_user_message'],
deps=deps,
message_history= message_history
) as result:
# Stream partial text as it arrives
async for chunk in result.stream_text(delta=True):
writer(chunk)
# print(ModelMessagesTypeAdapter.validate_json(result.new_messages_json()))
return {"messages": [result.new_messages_json()]}
# Interrupt the graph to get the user's next message
def get_next_user_message(state: AgentState):
value = interrupt({})
# Set the user's latest message for the LLM to continue the conversation
return {
"latest_user_message": value
}
# Determine if the user is finished creating their AI agent or not
async def route_user_message(state: AgentState):
prompt = f"""
The user has sent a message:
{state['latest_user_message']}
If the user wants to end the conversation, respond with just the text "finish_conversation".
If the user wants to continue coding the AI agent, respond with just the text "coder_agent".
"""
result = await router_agent.run(prompt)
next_action = result.data
if next_action == "finish_conversation":
return "finish_conversation"
else:
return "coder_agent"
# End of conversation agent to give instructions for executing the agent
async def finish_conversation(state: AgentState, writer):
# Get the message history into the format for Pydantic AI
message_history: list[ModelMessage] = []
for message_row in state['messages']:
message_history.extend(ModelMessagesTypeAdapter.validate_json(message_row))
# Run the agent in a stream
if is_ollama:
writer = get_stream_writer()
result = await end_conversation_agent.run(state['latest_user_message'], message_history= message_history)
writer(result.data)
else:
async with end_conversation_agent.run_stream(
state['latest_user_message'],
message_history= message_history
) as result:
# Stream partial text as it arrives
async for chunk in result.stream_text(delta=True):
writer(chunk)
return {"messages": [result.new_messages_json()]}
# Build workflow
builder = StateGraph(AgentState)
# Add nodes
builder.add_node("define_scope_with_reasoner", define_scope_with_reasoner)
builder.add_node("coder_agent", coder_agent)
builder.add_node("get_next_user_message", get_next_user_message)
builder.add_node("finish_conversation", finish_conversation)
# Set edges
builder.add_edge(START, "define_scope_with_reasoner")
builder.add_edge("define_scope_with_reasoner", "coder_agent")
builder.add_edge("coder_agent", "get_next_user_message")
builder.add_conditional_edges(
"get_next_user_message",
route_user_message,
{"coder_agent": "coder_agent", "finish_conversation": "finish_conversation"}
)
builder.add_edge("finish_conversation", END)
# Configure persistence
memory = MemorySaver()
agentic_flow = builder.compile(checkpointer=memory)

View File

@ -0,0 +1,245 @@
import os
import sys
import json
import asyncio
import requests
from xml.etree import ElementTree
from typing import List, Dict, Any
from dataclasses import dataclass
from datetime import datetime, timezone
from urllib.parse import urlparse
from dotenv import load_dotenv
from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode
from openai import AsyncOpenAI
from supabase import create_client, Client
load_dotenv()
# Initialize OpenAI and Supabase clients
openai_client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))
supabase: Client = create_client(
os.getenv("SUPABASE_URL"),
os.getenv("SUPABASE_SERVICE_KEY")
)
@dataclass
class ProcessedChunk:
url: str
chunk_number: int
title: str
summary: str
content: str
metadata: Dict[str, Any]
embedding: List[float]
def chunk_text(text: str, chunk_size: int = 5000) -> List[str]:
"""Split text into chunks, respecting code blocks and paragraphs."""
chunks = []
start = 0
text_length = len(text)
while start < text_length:
# Calculate end position
end = start + chunk_size
# If we're at the end of the text, just take what's left
if end >= text_length:
chunks.append(text[start:].strip())
break
# Try to find a code block boundary first (```)
chunk = text[start:end]
code_block = chunk.rfind('```')
if code_block != -1 and code_block > chunk_size * 0.3:
end = start + code_block
# If no code block, try to break at a paragraph
elif '\n\n' in chunk:
# Find the last paragraph break
last_break = chunk.rfind('\n\n')
if last_break > chunk_size * 0.3: # Only break if we're past 30% of chunk_size
end = start + last_break
# If no paragraph break, try to break at a sentence
elif '. ' in chunk:
# Find the last sentence break
last_period = chunk.rfind('. ')
if last_period > chunk_size * 0.3: # Only break if we're past 30% of chunk_size
end = start + last_period + 1
# Extract chunk and clean it up
chunk = text[start:end].strip()
if chunk:
chunks.append(chunk)
# Move start position for next chunk
start = max(start + 1, end)
return chunks
async def get_title_and_summary(chunk: str, url: str) -> Dict[str, str]:
"""Extract title and summary using GPT-4."""
system_prompt = """You are an AI that extracts titles and summaries from documentation chunks.
Return a JSON object with 'title' and 'summary' keys.
For the title: If this seems like the start of a document, extract its title. If it's a middle chunk, derive a descriptive title.
For the summary: Create a concise summary of the main points in this chunk.
Keep both title and summary concise but informative."""
try:
response = await openai_client.chat.completions.create(
model=os.getenv("LLM_MODEL", "gpt-4o-mini"),
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": f"URL: {url}\n\nContent:\n{chunk[:1000]}..."} # Send first 1000 chars for context
],
response_format={ "type": "json_object" }
)
return json.loads(response.choices[0].message.content)
except Exception as e:
print(f"Error getting title and summary: {e}")
return {"title": "Error processing title", "summary": "Error processing summary"}
async def get_embedding(text: str) -> List[float]:
"""Get embedding vector from OpenAI."""
try:
response = await openai_client.embeddings.create(
model="text-embedding-3-small",
input=text
)
return response.data[0].embedding
except Exception as e:
print(f"Error getting embedding: {e}")
return [0] * 1536 # Return zero vector on error
async def process_chunk(chunk: str, chunk_number: int, url: str) -> ProcessedChunk:
"""Process a single chunk of text."""
# Get title and summary
extracted = await get_title_and_summary(chunk, url)
# Get embedding
embedding = await get_embedding(chunk)
# Create metadata
metadata = {
"source": "pydantic_ai_docs",
"chunk_size": len(chunk),
"crawled_at": datetime.now(timezone.utc).isoformat(),
"url_path": urlparse(url).path
}
return ProcessedChunk(
url=url,
chunk_number=chunk_number,
title=extracted['title'],
summary=extracted['summary'],
content=chunk, # Store the original chunk content
metadata=metadata,
embedding=embedding
)
async def insert_chunk(chunk: ProcessedChunk):
"""Insert a processed chunk into Supabase."""
try:
data = {
"url": chunk.url,
"chunk_number": chunk.chunk_number,
"title": chunk.title,
"summary": chunk.summary,
"content": chunk.content,
"metadata": chunk.metadata,
"embedding": chunk.embedding
}
result = supabase.table("site_pages").insert(data).execute()
print(f"Inserted chunk {chunk.chunk_number} for {chunk.url}")
return result
except Exception as e:
print(f"Error inserting chunk: {e}")
return None
async def process_and_store_document(url: str, markdown: str):
"""Process a document and store its chunks in parallel."""
# Split into chunks
chunks = chunk_text(markdown)
# Process chunks in parallel
tasks = [
process_chunk(chunk, i, url)
for i, chunk in enumerate(chunks)
]
processed_chunks = await asyncio.gather(*tasks)
# Store chunks in parallel
insert_tasks = [
insert_chunk(chunk)
for chunk in processed_chunks
]
await asyncio.gather(*insert_tasks)
async def crawl_parallel(urls: List[str], max_concurrent: int = 5):
"""Crawl multiple URLs in parallel with a concurrency limit."""
browser_config = BrowserConfig(
headless=True,
verbose=False,
extra_args=["--disable-gpu", "--disable-dev-shm-usage", "--no-sandbox"],
)
crawl_config = CrawlerRunConfig(cache_mode=CacheMode.BYPASS)
# Create the crawler instance
crawler = AsyncWebCrawler(config=browser_config)
await crawler.start()
try:
# Create a semaphore to limit concurrency
semaphore = asyncio.Semaphore(max_concurrent)
async def process_url(url: str):
async with semaphore:
result = await crawler.arun(
url=url,
config=crawl_config,
session_id="session1"
)
if result.success:
print(f"Successfully crawled: {url}")
await process_and_store_document(url, result.markdown_v2.raw_markdown)
else:
print(f"Failed: {url} - Error: {result.error_message}")
# Process all URLs in parallel with limited concurrency
await asyncio.gather(*[process_url(url) for url in urls])
finally:
await crawler.close()
def get_pydantic_ai_docs_urls() -> List[str]:
"""Get URLs from Pydantic AI docs sitemap."""
sitemap_url = "https://ai.pydantic.dev/sitemap.xml"
try:
response = requests.get(sitemap_url)
response.raise_for_status()
# Parse the XML
root = ElementTree.fromstring(response.content)
# Extract all URLs from the sitemap
namespace = {'ns': 'http://www.sitemaps.org/schemas/sitemap/0.9'}
urls = [loc.text for loc in root.findall('.//ns:loc', namespace)]
return urls
except Exception as e:
print(f"Error fetching sitemap: {e}")
return []
async def main():
# Get URLs from Pydantic AI docs
urls = get_pydantic_ai_docs_urls()
if not urls:
print("No URLs found to crawl")
return
print(f"Found {len(urls)} URLs to crawl")
await crawl_parallel(urls)
if __name__ == "__main__":
asyncio.run(main())

View File

@ -0,0 +1,7 @@
{
"dependencies": ["."],
"graphs": {
"agent": "./archon_graph.py:agentic_flow"
},
"env": ".env"
}

View File

@ -0,0 +1,219 @@
from __future__ import annotations as _annotations
from dataclasses import dataclass
from dotenv import load_dotenv
import logfire
import asyncio
import httpx
import os
from pydantic_ai import Agent, ModelRetry, RunContext
from pydantic_ai.models.openai import OpenAIModel
from openai import AsyncOpenAI
from supabase import Client
from typing import List
load_dotenv()
llm = os.getenv('PRIMARY_MODEL', 'gpt-4o-mini')
base_url = os.getenv('BASE_URL', 'https://api.openai.com/v1')
api_key = os.getenv('LLM_API_KEY', 'no-llm-api-key-provided')
model = OpenAIModel(llm, base_url=base_url, api_key=api_key)
logfire.configure(send_to_logfire='if-token-present')
@dataclass
class PydanticAIDeps:
supabase: Client
openai_client: AsyncOpenAI
reasoner_output: str
system_prompt = """
~~ CONTEXT: ~~
You are an expert at Pydantic AI - a Python AI agent framework that you have access to all the documentation to,
including examples, an API reference, and other resources to help you build Pydantic AI agents.
~~ GOAL: ~~
Your only job is to help the user create an AI agent with Pydantic AI.
The user will describe the AI agent they want to build, or if they don't, guide them towards doing so.
You will take their requirements, and then search through the Pydantic AI documentation with the tools provided
to find all the necessary information to create the AI agent with correct code.
It's important for you to search through multiple Pydantic AI documentation pages to get all the information you need.
Almost never stick to just one page - use RAG and the other documentation tools multiple times when you are creating
an AI agent from scratch for the user.
~~ STRUCTURE: ~~
When you build an AI agent from scratch, split the agent into this files and give the code for each:
- `agent.py`: The main agent file, which is where the Pydantic AI agent is defined.
- `agent_tools.py`: A tools file for the agent, which is where all the tool functions are defined. Use this for more complex agents.
- `agent_prompts.py`: A prompts file for the agent, which includes all system prompts and other prompts used by the agent. Use this when there are many prompts or large ones.
- `.env.example`: An example `.env` file - specify each variable that the user will need to fill in and a quick comment above each one for how to do so.
- `requirements.txt`: Don't include any versions, just the top level package names needed for the agent.
~~ INSTRUCTIONS: ~~
- Don't ask the user before taking an action, just do it. Always make sure you look at the documentation with the provided tools before writing any code.
- When you first look at the documentation, always start with RAG.
Then also always check the list of available documentation pages and retrieve the content of page(s) if it'll help.
- Always let the user know when you didn't find the answer in the documentation or the right URL - be honest.
- Helpful tip: when starting a new AI agent build, it's a good idea to look at the 'weather agent' in the docs as an example.
- When starting a new AI agent build, always produce the full code for the AI agent - never tell the user to finish a tool/function.
- When refining an existing AI agent build in a conversation, just share the code changes necessary.
- Each time you respond to the user, ask them to let you know either if they need changes or the code looks good.
"""
pydantic_ai_coder = Agent(
model,
system_prompt=system_prompt,
deps_type=PydanticAIDeps,
retries=2
)
@pydantic_ai_coder.system_prompt
def add_reasoner_output(ctx: RunContext[str]) -> str:
return f"""
\n\nAdditional thoughts/instructions from the reasoner LLM.
This scope includes documentation pages for you to search as well:
{ctx.deps.reasoner_output}
"""
# Add this in to get some crazy tool calling:
# You must get ALL documentation pages listed in the scope.
async def get_embedding(text: str, openai_client: AsyncOpenAI) -> List[float]:
"""Get embedding vector from OpenAI."""
try:
response = await openai_client.embeddings.create(
model="text-embedding-3-small",
input=text
)
return response.data[0].embedding
except Exception as e:
print(f"Error getting embedding: {e}")
return [0] * 1536 # Return zero vector on error
@pydantic_ai_coder.tool
async def retrieve_relevant_documentation(ctx: RunContext[PydanticAIDeps], user_query: str) -> str:
"""
Retrieve relevant documentation chunks based on the query with RAG.
Args:
ctx: The context including the Supabase client and OpenAI client
user_query: The user's question or query
Returns:
A formatted string containing the top 5 most relevant documentation chunks
"""
try:
# Get the embedding for the query
query_embedding = await get_embedding(user_query, ctx.deps.openai_client)
# Query Supabase for relevant documents
result = ctx.deps.supabase.rpc(
'match_site_pages',
{
'query_embedding': query_embedding,
'match_count': 5,
'filter': {'source': 'pydantic_ai_docs'}
}
).execute()
if not result.data:
return "No relevant documentation found."
# Format the results
formatted_chunks = []
for doc in result.data:
chunk_text = f"""
# {doc['title']}
{doc['content']}
"""
formatted_chunks.append(chunk_text)
# Join all chunks with a separator
return "\n\n---\n\n".join(formatted_chunks)
except Exception as e:
print(f"Error retrieving documentation: {e}")
return f"Error retrieving documentation: {str(e)}"
async def list_documentation_pages_helper(supabase: Client) -> List[str]:
"""
Function to retrieve a list of all available Pydantic AI documentation pages.
This is called by the list_documentation_pages tool and also externally
to fetch documentation pages for the reasoner LLM.
Returns:
List[str]: List of unique URLs for all documentation pages
"""
try:
# Query Supabase for unique URLs where source is pydantic_ai_docs
result = supabase.from_('site_pages') \
.select('url') \
.eq('metadata->>source', 'pydantic_ai_docs') \
.execute()
if not result.data:
return []
# Extract unique URLs
urls = sorted(set(doc['url'] for doc in result.data))
return urls
except Exception as e:
print(f"Error retrieving documentation pages: {e}")
return []
@pydantic_ai_coder.tool
async def list_documentation_pages(ctx: RunContext[PydanticAIDeps]) -> List[str]:
"""
Retrieve a list of all available Pydantic AI documentation pages.
Returns:
List[str]: List of unique URLs for all documentation pages
"""
return await list_documentation_pages_helper(ctx.deps.supabase)
@pydantic_ai_coder.tool
async def get_page_content(ctx: RunContext[PydanticAIDeps], url: str) -> str:
"""
Retrieve the full content of a specific documentation page by combining all its chunks.
Args:
ctx: The context including the Supabase client
url: The URL of the page to retrieve
Returns:
str: The complete page content with all chunks combined in order
"""
try:
# Query Supabase for all chunks of this URL, ordered by chunk_number
result = ctx.deps.supabase.from_('site_pages') \
.select('title, content, chunk_number') \
.eq('url', url) \
.eq('metadata->>source', 'pydantic_ai_docs') \
.order('chunk_number') \
.execute()
if not result.data:
return f"No content found for URL: {url}"
# Format the page with its title and all chunks
page_title = result.data[0]['title'].split(' - ')[0] # Get the main title
formatted_content = [f"# {page_title}\n"]
# Add each chunk's content
for chunk in result.data:
formatted_content.append(chunk['content'])
# Join everything together
return "\n\n".join(formatted_content)
except Exception as e:
print(f"Error retrieving page content: {e}")
return f"Error retrieving page content: {str(e)}"

Binary file not shown.

View File

@ -0,0 +1,72 @@
-- Enable the pgvector extension
create extension if not exists vector;
-- Create the documentation chunks table
create table site_pages (
id bigserial primary key,
url varchar not null,
chunk_number integer not null,
title varchar not null,
summary varchar not null,
content text not null, -- Added content column
metadata jsonb not null default '{}'::jsonb, -- Added metadata column
embedding vector(1536), -- OpenAI embeddings are 1536 dimensions
created_at timestamp with time zone default timezone('utc'::text, now()) not null,
-- Add a unique constraint to prevent duplicate chunks for the same URL
unique(url, chunk_number)
);
-- Create an index for better vector similarity search performance
create index on site_pages using ivfflat (embedding vector_cosine_ops);
-- Create an index on metadata for faster filtering
create index idx_site_pages_metadata on site_pages using gin (metadata);
-- Create a function to search for documentation chunks
create function match_site_pages (
query_embedding vector(1536),
match_count int default 10,
filter jsonb DEFAULT '{}'::jsonb
) returns table (
id bigint,
url varchar,
chunk_number integer,
title varchar,
summary varchar,
content text,
metadata jsonb,
similarity float
)
language plpgsql
as $$
#variable_conflict use_column
begin
return query
select
id,
url,
chunk_number,
title,
summary,
content,
metadata,
1 - (site_pages.embedding <=> query_embedding) as similarity
from site_pages
where metadata @> filter
order by site_pages.embedding <=> query_embedding
limit match_count;
end;
$$;
-- Everything above will work for any PostgreSQL database. The below commands are for Supabase security
-- Enable RLS on the table
alter table site_pages enable row level security;
-- Create a policy that allows anyone to read
create policy "Allow public read access"
on site_pages
for select
to public
using (true);

View File

@ -0,0 +1,114 @@
from __future__ import annotations
from typing import Literal, TypedDict
from langgraph.types import Command
from openai import AsyncOpenAI
from supabase import Client
import streamlit as st
import logfire
import asyncio
import json
import uuid
import os
# Import all the message part classes
from pydantic_ai.messages import (
ModelMessage,
ModelRequest,
ModelResponse,
SystemPromptPart,
UserPromptPart,
TextPart,
ToolCallPart,
ToolReturnPart,
RetryPromptPart,
ModelMessagesTypeAdapter
)
from archon_graph import agentic_flow
# Load environment variables
from dotenv import load_dotenv
load_dotenv()
openai_client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))
supabase: Client = Client(
os.getenv("SUPABASE_URL"),
os.getenv("SUPABASE_SERVICE_KEY")
)
# Configure logfire to suppress warnings (optional)
logfire.configure(send_to_logfire='never')
@st.cache_resource
def get_thread_id():
return str(uuid.uuid4())
thread_id = get_thread_id()
async def run_agent_with_streaming(user_input: str):
"""
Run the agent with streaming text for the user_input prompt,
while maintaining the entire conversation in `st.session_state.messages`.
"""
config = {
"configurable": {
"thread_id": thread_id
}
}
# First message from user
if len(st.session_state.messages) == 1:
async for msg in agentic_flow.astream(
{"latest_user_message": user_input}, config, stream_mode="custom"
):
yield msg
# Continue the conversation
else:
async for msg in agentic_flow.astream(
Command(resume=user_input), config, stream_mode="custom"
):
yield msg
async def main():
st.title("Archon - Agent Builder")
st.write("Describe to me an AI agent you want to build and I'll code it for you with Pydantic AI.")
st.write("Example: Build me an AI agent that can search the web with the Brave API.")
# Initialize chat history in session state if not present
if "messages" not in st.session_state:
st.session_state.messages = []
# Display chat messages from history on app rerun
for message in st.session_state.messages:
message_type = message["type"]
if message_type in ["human", "ai", "system"]:
with st.chat_message(message_type):
st.markdown(message["content"])
# Chat input for the user
user_input = st.chat_input("What do you want to build today?")
if user_input:
# We append a new request to the conversation explicitly
st.session_state.messages.append({"type": "human", "content": user_input})
# Display user prompt in the UI
with st.chat_message("user"):
st.markdown(user_input)
# Display assistant response in chat message container
response_content = ""
with st.chat_message("assistant"):
message_placeholder = st.empty() # Placeholder for updating the message
# Run the async generator to fetch responses
async for chunk in run_agent_with_streaming(user_input):
response_content += chunk
# Update the placeholder with the current response content
message_placeholder.markdown(response_content)
st.session_state.messages.append({"type": "ai", "content": response_content})
if __name__ == "__main__":
asyncio.run(main())

7
langgraph.json Normal file
View File

@ -0,0 +1,7 @@
{
"dependencies": ["."],
"graphs": {
"agent": "./archon_graph.py:agentic_flow"
},
"env": ".env"
}

BIN
public/Archon.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 576 KiB

219
pydantic_ai_coder.py Normal file
View File

@ -0,0 +1,219 @@
from __future__ import annotations as _annotations
from dataclasses import dataclass
from dotenv import load_dotenv
import logfire
import asyncio
import httpx
import os
from pydantic_ai import Agent, ModelRetry, RunContext
from pydantic_ai.models.openai import OpenAIModel
from openai import AsyncOpenAI
from supabase import Client
from typing import List
load_dotenv()
llm = os.getenv('PRIMARY_MODEL', 'gpt-4o-mini')
base_url = os.getenv('BASE_URL', 'https://api.openai.com/v1')
api_key = os.getenv('LLM_API_KEY', 'no-llm-api-key-provided')
model = OpenAIModel(llm, base_url=base_url, api_key=api_key)
logfire.configure(send_to_logfire='if-token-present')
@dataclass
class PydanticAIDeps:
supabase: Client
openai_client: AsyncOpenAI
reasoner_output: str
system_prompt = """
~~ CONTEXT: ~~
You are an expert at Pydantic AI - a Python AI agent framework that you have access to all the documentation to,
including examples, an API reference, and other resources to help you build Pydantic AI agents.
~~ GOAL: ~~
Your only job is to help the user create an AI agent with Pydantic AI.
The user will describe the AI agent they want to build, or if they don't, guide them towards doing so.
You will take their requirements, and then search through the Pydantic AI documentation with the tools provided
to find all the necessary information to create the AI agent with correct code.
It's important for you to search through multiple Pydantic AI documentation pages to get all the information you need.
Almost never stick to just one page - use RAG and the other documentation tools multiple times when you are creating
an AI agent from scratch for the user.
~~ STRUCTURE: ~~
When you build an AI agent from scratch, split the agent into this files and give the code for each:
- `agent.py`: The main agent file, which is where the Pydantic AI agent is defined.
- `agent_tools.py`: A tools file for the agent, which is where all the tool functions are defined. Use this for more complex agents.
- `agent_prompts.py`: A prompts file for the agent, which includes all system prompts and other prompts used by the agent. Use this when there are many prompts or large ones.
- `.env.example`: An example `.env` file - specify each variable that the user will need to fill in and a quick comment above each one for how to do so.
- `requirements.txt`: Don't include any versions, just the top level package names needed for the agent.
~~ INSTRUCTIONS: ~~
- Don't ask the user before taking an action, just do it. Always make sure you look at the documentation with the provided tools before writing any code.
- When you first look at the documentation, always start with RAG.
Then also always check the list of available documentation pages and retrieve the content of page(s) if it'll help.
- Always let the user know when you didn't find the answer in the documentation or the right URL - be honest.
- Helpful tip: when starting a new AI agent build, it's a good idea to look at the 'weather agent' in the docs as an example.
- When starting a new AI agent build, always produce the full code for the AI agent - never tell the user to finish a tool/function.
- When refining an existing AI agent build in a conversation, just share the code changes necessary.
- Each time you respond to the user, ask them to let you know either if they need changes or the code looks good.
"""
pydantic_ai_coder = Agent(
model,
system_prompt=system_prompt,
deps_type=PydanticAIDeps,
retries=2
)
@pydantic_ai_coder.system_prompt
def add_reasoner_output(ctx: RunContext[str]) -> str:
return f"""
\n\nAdditional thoughts/instructions from the reasoner LLM.
This scope includes documentation pages for you to search as well:
{ctx.deps.reasoner_output}
"""
# Add this in to get some crazy tool calling:
# You must get ALL documentation pages listed in the scope.
async def get_embedding(text: str, openai_client: AsyncOpenAI) -> List[float]:
"""Get embedding vector from OpenAI."""
try:
response = await openai_client.embeddings.create(
model="text-embedding-3-small",
input=text
)
return response.data[0].embedding
except Exception as e:
print(f"Error getting embedding: {e}")
return [0] * 1536 # Return zero vector on error
@pydantic_ai_coder.tool
async def retrieve_relevant_documentation(ctx: RunContext[PydanticAIDeps], user_query: str) -> str:
"""
Retrieve relevant documentation chunks based on the query with RAG.
Args:
ctx: The context including the Supabase client and OpenAI client
user_query: The user's question or query
Returns:
A formatted string containing the top 5 most relevant documentation chunks
"""
try:
# Get the embedding for the query
query_embedding = await get_embedding(user_query, ctx.deps.openai_client)
# Query Supabase for relevant documents
result = ctx.deps.supabase.rpc(
'match_site_pages',
{
'query_embedding': query_embedding,
'match_count': 5,
'filter': {'source': 'pydantic_ai_docs'}
}
).execute()
if not result.data:
return "No relevant documentation found."
# Format the results
formatted_chunks = []
for doc in result.data:
chunk_text = f"""
# {doc['title']}
{doc['content']}
"""
formatted_chunks.append(chunk_text)
# Join all chunks with a separator
return "\n\n---\n\n".join(formatted_chunks)
except Exception as e:
print(f"Error retrieving documentation: {e}")
return f"Error retrieving documentation: {str(e)}"
async def list_documentation_pages_helper(supabase: Client) -> List[str]:
"""
Function to retrieve a list of all available Pydantic AI documentation pages.
This is called by the list_documentation_pages tool and also externally
to fetch documentation pages for the reasoner LLM.
Returns:
List[str]: List of unique URLs for all documentation pages
"""
try:
# Query Supabase for unique URLs where source is pydantic_ai_docs
result = supabase.from_('site_pages') \
.select('url') \
.eq('metadata->>source', 'pydantic_ai_docs') \
.execute()
if not result.data:
return []
# Extract unique URLs
urls = sorted(set(doc['url'] for doc in result.data))
return urls
except Exception as e:
print(f"Error retrieving documentation pages: {e}")
return []
@pydantic_ai_coder.tool
async def list_documentation_pages(ctx: RunContext[PydanticAIDeps]) -> List[str]:
"""
Retrieve a list of all available Pydantic AI documentation pages.
Returns:
List[str]: List of unique URLs for all documentation pages
"""
return await list_documentation_pages_helper(ctx.deps.supabase)
@pydantic_ai_coder.tool
async def get_page_content(ctx: RunContext[PydanticAIDeps], url: str) -> str:
"""
Retrieve the full content of a specific documentation page by combining all its chunks.
Args:
ctx: The context including the Supabase client
url: The URL of the page to retrieve
Returns:
str: The complete page content with all chunks combined in order
"""
try:
# Query Supabase for all chunks of this URL, ordered by chunk_number
result = ctx.deps.supabase.from_('site_pages') \
.select('title, content, chunk_number') \
.eq('url', url) \
.eq('metadata->>source', 'pydantic_ai_docs') \
.order('chunk_number') \
.execute()
if not result.data:
return f"No content found for URL: {url}"
# Format the page with its title and all chunks
page_title = result.data[0]['title'].split(' - ')[0] # Get the main title
formatted_content = [f"# {page_title}\n"]
# Add each chunk's content
for chunk in result.data:
formatted_content.append(chunk['content'])
# Join everything together
return "\n\n".join(formatted_content)
except Exception as e:
print(f"Error retrieving page content: {e}")
return f"Error retrieving page content: {str(e)}"

BIN
requirements.txt Normal file

Binary file not shown.

72
site_pages.sql Normal file
View File

@ -0,0 +1,72 @@
-- Enable the pgvector extension
create extension if not exists vector;
-- Create the documentation chunks table
create table site_pages (
id bigserial primary key,
url varchar not null,
chunk_number integer not null,
title varchar not null,
summary varchar not null,
content text not null, -- Added content column
metadata jsonb not null default '{}'::jsonb, -- Added metadata column
embedding vector(1536), -- OpenAI embeddings are 1536 dimensions
created_at timestamp with time zone default timezone('utc'::text, now()) not null,
-- Add a unique constraint to prevent duplicate chunks for the same URL
unique(url, chunk_number)
);
-- Create an index for better vector similarity search performance
create index on site_pages using ivfflat (embedding vector_cosine_ops);
-- Create an index on metadata for faster filtering
create index idx_site_pages_metadata on site_pages using gin (metadata);
-- Create a function to search for documentation chunks
create function match_site_pages (
query_embedding vector(1536),
match_count int default 10,
filter jsonb DEFAULT '{}'::jsonb
) returns table (
id bigint,
url varchar,
chunk_number integer,
title varchar,
summary varchar,
content text,
metadata jsonb,
similarity float
)
language plpgsql
as $$
#variable_conflict use_column
begin
return query
select
id,
url,
chunk_number,
title,
summary,
content,
metadata,
1 - (site_pages.embedding <=> query_embedding) as similarity
from site_pages
where metadata @> filter
order by site_pages.embedding <=> query_embedding
limit match_count;
end;
$$;
-- Everything above will work for any PostgreSQL database. The below commands are for Supabase security
-- Enable RLS on the table
alter table site_pages enable row level security;
-- Create a policy that allows anyone to read
create policy "Allow public read access"
on site_pages
for select
to public
using (true);

114
streamlit_ui.py Normal file
View File

@ -0,0 +1,114 @@
from __future__ import annotations
from typing import Literal, TypedDict
from langgraph.types import Command
from openai import AsyncOpenAI
from supabase import Client
import streamlit as st
import logfire
import asyncio
import json
import uuid
import os
# Import all the message part classes
from pydantic_ai.messages import (
ModelMessage,
ModelRequest,
ModelResponse,
SystemPromptPart,
UserPromptPart,
TextPart,
ToolCallPart,
ToolReturnPart,
RetryPromptPart,
ModelMessagesTypeAdapter
)
from archon_graph import agentic_flow
# Load environment variables
from dotenv import load_dotenv
load_dotenv()
openai_client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))
supabase: Client = Client(
os.getenv("SUPABASE_URL"),
os.getenv("SUPABASE_SERVICE_KEY")
)
# Configure logfire to suppress warnings (optional)
logfire.configure(send_to_logfire='never')
@st.cache_resource
def get_thread_id():
return str(uuid.uuid4())
thread_id = get_thread_id()
async def run_agent_with_streaming(user_input: str):
"""
Run the agent with streaming text for the user_input prompt,
while maintaining the entire conversation in `st.session_state.messages`.
"""
config = {
"configurable": {
"thread_id": thread_id
}
}
# First message from user
if len(st.session_state.messages) == 1:
async for msg in agentic_flow.astream(
{"latest_user_message": user_input}, config, stream_mode="custom"
):
yield msg
# Continue the conversation
else:
async for msg in agentic_flow.astream(
Command(resume=user_input), config, stream_mode="custom"
):
yield msg
async def main():
st.title("Archon - Agent Builder")
st.write("Describe to me an AI agent you want to build and I'll code it for you with Pydantic AI.")
st.write("Example: Build me an AI agent that can search the web with the Brave API.")
# Initialize chat history in session state if not present
if "messages" not in st.session_state:
st.session_state.messages = []
# Display chat messages from history on app rerun
for message in st.session_state.messages:
message_type = message["type"]
if message_type in ["human", "ai", "system"]:
with st.chat_message(message_type):
st.markdown(message["content"])
# Chat input for the user
user_input = st.chat_input("What do you want to build today?")
if user_input:
# We append a new request to the conversation explicitly
st.session_state.messages.append({"type": "human", "content": user_input})
# Display user prompt in the UI
with st.chat_message("user"):
st.markdown(user_input)
# Display assistant response in chat message container
response_content = ""
with st.chat_message("assistant"):
message_placeholder = st.empty() # Placeholder for updating the message
# Run the async generator to fetch responses
async for chunk in run_agent_with_streaming(user_input):
response_content += chunk
# Update the placeholder with the current response content
message_placeholder.markdown(response_content)
st.session_state.messages.append({"type": "ai", "content": response_content})
if __name__ == "__main__":
asyncio.run(main())