Fix crawler concurrency configuration to prevent memory crashes

Consolidate concurrent crawling limits to use single database setting
instead of hardcoded special case for documentation sites.

Changes:
- Remove hardcoded 20 concurrent limit for documentation sites
- Let strategies use CRAWL_MAX_CONCURRENT from database (default: 10)
- Apply consistent concurrency across all site types
- Improve code formatting and consistency

This fixes Playwright browser crashes caused by excessive concurrent
pages on documentation sites and provides single configuration point
for tuning crawler performance.
This commit is contained in:
Rasmus Widing 2025-08-15 15:45:04 +03:00
parent ad1b8bf70f
commit aab0721f0c
2 changed files with 290 additions and 218 deletions

View File

@ -19,14 +19,20 @@ from ...utils import get_supabase_client
update_crawl_progress = None
complete_crawl_progress = None
def _ensure_socketio_imports():
"""Ensure socket.IO handlers are imported."""
global update_crawl_progress, complete_crawl_progress
if update_crawl_progress is None:
from ...api_routes.socketio_handlers import update_crawl_progress as _update, complete_crawl_progress as _complete
from ...api_routes.socketio_handlers import (
update_crawl_progress as _update,
complete_crawl_progress as _complete,
)
update_crawl_progress = _update
complete_crawl_progress = _complete
# Import strategies
from .strategies.batch import BatchCrawlStrategy
from .strategies.recursive import RecursiveCrawlStrategy
@ -44,15 +50,15 @@ from .progress_mapper import ProgressMapper
logger = get_logger(__name__)
# Global registry to track active orchestration services for cancellation support
_active_orchestrations: Dict[str, 'CrawlingService'] = {}
_active_orchestrations: Dict[str, "CrawlingService"] = {}
def get_active_orchestration(progress_id: str) -> Optional['CrawlingService']:
def get_active_orchestration(progress_id: str) -> Optional["CrawlingService"]:
"""Get an active orchestration service by progress ID."""
return _active_orchestrations.get(progress_id)
def register_orchestration(progress_id: str, orchestration: 'CrawlingService'):
def register_orchestration(progress_id: str, orchestration: "CrawlingService"):
"""Register an active orchestration service."""
_active_orchestrations[progress_id] = orchestration
@ -68,11 +74,11 @@ class CrawlingService:
Service class for web crawling and orchestration operations.
Combines functionality from both CrawlingService and CrawlOrchestrationService.
"""
def __init__(self, crawler=None, supabase_client=None, progress_id=None):
"""
Initialize the crawling service.
Args:
crawler: The Crawl4AI crawler instance
supabase_client: The Supabase client for database operations
@ -81,92 +87,97 @@ class CrawlingService:
self.crawler = crawler
self.supabase_client = supabase_client or get_supabase_client()
self.progress_id = progress_id
# Initialize helpers
self.url_handler = URLHandler()
self.site_config = SiteConfig()
self.markdown_generator = self.site_config.get_markdown_generator()
# Initialize strategies
self.batch_strategy = BatchCrawlStrategy(crawler, self.markdown_generator)
self.recursive_strategy = RecursiveCrawlStrategy(crawler, self.markdown_generator)
self.single_page_strategy = SinglePageCrawlStrategy(crawler, self.markdown_generator)
self.sitemap_strategy = SitemapCrawlStrategy()
# Initialize operations
self.doc_storage_ops = DocumentStorageOperations(self.supabase_client)
# Track progress state across all stages to prevent UI resets
self.progress_state = {'progressId': self.progress_id} if self.progress_id else {}
self.progress_state = {"progressId": self.progress_id} if self.progress_id else {}
# Initialize progress mapper to prevent backwards jumps
self.progress_mapper = ProgressMapper()
# Cancellation support
self._cancelled = False
def set_progress_id(self, progress_id: str):
"""Set the progress ID for Socket.IO updates."""
self.progress_id = progress_id
if self.progress_id:
self.progress_state = {'progressId': self.progress_id}
self.progress_state = {"progressId": self.progress_id}
def cancel(self):
"""Cancel the crawl operation."""
self._cancelled = True
safe_logfire_info(f"Crawl operation cancelled | progress_id={self.progress_id}")
def is_cancelled(self) -> bool:
"""Check if the crawl operation has been cancelled."""
return self._cancelled
def _check_cancellation(self):
"""Check if cancelled and raise an exception if so."""
if self._cancelled:
raise asyncio.CancelledError("Crawl operation was cancelled by user")
async def _create_crawl_progress_callback(self, base_status: str) -> Callable[[str, int, str], Awaitable[None]]:
async def _create_crawl_progress_callback(
self, base_status: str
) -> Callable[[str, int, str], Awaitable[None]]:
"""Create a progress callback for crawling operations.
Args:
base_status: The base status to use for progress updates
Returns:
Async callback function with signature (status: str, percentage: int, message: str, **kwargs) -> None
"""
_ensure_socketio_imports()
async def callback(status: str, percentage: int, message: str, **kwargs):
if self.progress_id:
# Update and preserve progress state
self.progress_state.update({
'status': base_status,
'percentage': percentage,
'log': message,
**kwargs
"status": base_status,
"percentage": percentage,
"log": message,
**kwargs,
})
safe_logfire_info(f"Emitting crawl progress | progress_id={self.progress_id} | status={base_status} | percentage={percentage}")
safe_logfire_info(
f"Emitting crawl progress | progress_id={self.progress_id} | status={base_status} | percentage={percentage}"
)
await update_crawl_progress(self.progress_id, self.progress_state)
return callback
async def _handle_progress_update(self, task_id: str, update: Dict[str, Any]) -> None:
"""
Handle progress updates from background task.
Args:
task_id: The task ID for the progress update
update: Dictionary containing progress update data
"""
_ensure_socketio_imports()
if self.progress_id:
# Update and preserve progress state
self.progress_state.update(update)
# Ensure progressId is always included
if self.progress_id and 'progressId' not in self.progress_state:
self.progress_state['progressId'] = self.progress_id
if self.progress_id and "progressId" not in self.progress_state:
self.progress_state["progressId"] = self.progress_id
# Always emit progress updates for real-time feedback
await update_crawl_progress(self.progress_id, self.progress_state)
# Simple delegation methods for backward compatibility
async def crawl_single_page(self, url: str, retry_count: int = 3) -> Dict[str, Any]:
"""Crawl a single web page."""
@ -174,27 +185,33 @@ class CrawlingService:
url,
self.url_handler.transform_github_url,
self.site_config.is_documentation_site,
retry_count
retry_count,
)
async def crawl_markdown_file(self, url: str, progress_callback=None,
start_progress: int = 10, end_progress: int = 20) -> List[Dict[str, Any]]:
async def crawl_markdown_file(
self, url: str, progress_callback=None, start_progress: int = 10, end_progress: int = 20
) -> List[Dict[str, Any]]:
"""Crawl a .txt or markdown file."""
return await self.single_page_strategy.crawl_markdown_file(
url,
self.url_handler.transform_github_url,
progress_callback,
start_progress,
end_progress
end_progress,
)
def parse_sitemap(self, sitemap_url: str) -> List[str]:
"""Parse a sitemap and extract URLs."""
return self.sitemap_strategy.parse_sitemap(sitemap_url)
async def crawl_batch_with_progress(self, urls: List[str], max_concurrent: int = None,
progress_callback=None, start_progress: int = 15,
end_progress: int = 60) -> List[Dict[str, Any]]:
async def crawl_batch_with_progress(
self,
urls: List[str],
max_concurrent: int = None,
progress_callback=None,
start_progress: int = 15,
end_progress: int = 60,
) -> List[Dict[str, Any]]:
"""Batch crawl multiple URLs in parallel."""
return await self.batch_strategy.crawl_batch_with_progress(
urls,
@ -203,12 +220,18 @@ class CrawlingService:
max_concurrent,
progress_callback,
start_progress,
end_progress
end_progress,
)
async def crawl_recursive_with_progress(self, start_urls: List[str], max_depth: int = 3,
max_concurrent: int = None, progress_callback=None,
start_progress: int = 10, end_progress: int = 60) -> List[Dict[str, Any]]:
async def crawl_recursive_with_progress(
self,
start_urls: List[str],
max_depth: int = 3,
max_concurrent: int = None,
progress_callback=None,
start_progress: int = 10,
end_progress: int = 60,
) -> List[Dict[str, Any]]:
"""Recursively crawl internal links from start URLs."""
return await self.recursive_strategy.crawl_recursive_with_progress(
start_urls,
@ -218,150 +241,164 @@ class CrawlingService:
max_concurrent,
progress_callback,
start_progress,
end_progress
end_progress,
)
# Orchestration methods
async def orchestrate_crawl(self, request: Dict[str, Any]) -> Dict[str, Any]:
"""
Main orchestration method - non-blocking using asyncio.create_task.
Args:
request: The crawl request containing url, knowledge_type, tags, max_depth, etc.
Returns:
Dict containing task_id and status
"""
url = str(request.get('url', ''))
url = str(request.get("url", ""))
safe_logfire_info(f"Starting background crawl orchestration | url={url}")
# Create task ID
task_id = self.progress_id or str(uuid.uuid4())
# Register this orchestration service for cancellation support
if self.progress_id:
register_orchestration(self.progress_id, self)
# Start the crawl as an async task in the main event loop
asyncio.create_task(self._async_orchestrate_crawl(request, task_id))
# Return immediately
return {
"task_id": task_id,
"status": "started",
"message": f"Crawl operation started for {url}",
"progress_id": self.progress_id
"progress_id": self.progress_id,
}
async def _async_orchestrate_crawl(self, request: Dict[str, Any], task_id: str):
"""
Async orchestration that runs in the main event loop.
"""
last_heartbeat = asyncio.get_event_loop().time()
heartbeat_interval = 30.0 # Send heartbeat every 30 seconds
async def send_heartbeat_if_needed():
"""Send heartbeat to keep Socket.IO connection alive"""
nonlocal last_heartbeat
current_time = asyncio.get_event_loop().time()
if current_time - last_heartbeat >= heartbeat_interval:
await self._handle_progress_update(task_id, {
'status': self.progress_mapper.get_current_stage(),
'percentage': self.progress_mapper.get_current_progress(),
'heartbeat': True,
'log': 'Background task still running...',
'message': 'Processing...'
})
await self._handle_progress_update(
task_id,
{
"status": self.progress_mapper.get_current_stage(),
"percentage": self.progress_mapper.get_current_progress(),
"heartbeat": True,
"log": "Background task still running...",
"message": "Processing...",
},
)
last_heartbeat = current_time
try:
url = str(request.get('url', ''))
url = str(request.get("url", ""))
safe_logfire_info(f"Starting async crawl orchestration | url={url} | task_id={task_id}")
# Extract source_id from the original URL
parsed_original_url = urlparse(url)
original_source_id = parsed_original_url.netloc or parsed_original_url.path
safe_logfire_info(f"Using source_id '{original_source_id}' from original URL '{url}'")
# Helper to update progress with mapper
async def update_mapped_progress(stage: str, stage_progress: int, message: str, **kwargs):
async def update_mapped_progress(
stage: str, stage_progress: int, message: str, **kwargs
):
overall_progress = self.progress_mapper.map_progress(stage, stage_progress)
await self._handle_progress_update(task_id, {
'status': stage,
'percentage': overall_progress,
'log': message,
'message': message,
**kwargs
})
await self._handle_progress_update(
task_id,
{
"status": stage,
"percentage": overall_progress,
"log": message,
"message": message,
**kwargs,
},
)
# Initial progress
await update_mapped_progress('starting', 100, f'Starting crawl of {url}', currentUrl=url)
await update_mapped_progress(
"starting", 100, f"Starting crawl of {url}", currentUrl=url
)
# Check for cancellation before proceeding
self._check_cancellation()
# Analyzing stage
await update_mapped_progress('analyzing', 50, f'Analyzing URL type for {url}')
await update_mapped_progress("analyzing", 50, f"Analyzing URL type for {url}")
# Detect URL type and perform crawl
crawl_results, crawl_type = await self._crawl_by_url_type(url, request)
# Check for cancellation after crawling
self._check_cancellation()
# Send heartbeat after potentially long crawl operation
await send_heartbeat_if_needed()
if not crawl_results:
raise ValueError("No content was crawled from the provided URL")
# Processing stage
await update_mapped_progress('processing', 50, 'Processing crawled content')
await update_mapped_progress("processing", 50, "Processing crawled content")
# Check for cancellation before document processing
self._check_cancellation()
# Process and store documents using document storage operations
async def doc_storage_callback(message: str, percentage: int, batch_info: Optional[dict] = None):
async def doc_storage_callback(
message: str, percentage: int, batch_info: Optional[dict] = None
):
if self.progress_id:
_ensure_socketio_imports()
# Map percentage to document storage range (20-85%)
mapped_percentage = 20 + int((percentage / 100) * (85 - 20))
safe_logfire_info(f"Document storage progress mapping: {percentage}% -> {mapped_percentage}%")
safe_logfire_info(
f"Document storage progress mapping: {percentage}% -> {mapped_percentage}%"
)
# Update progress state while preserving existing fields
self.progress_state.update({
'status': 'document_storage',
'percentage': mapped_percentage,
'log': message
"status": "document_storage",
"percentage": mapped_percentage,
"log": message,
})
# Add batch_info fields if provided
if batch_info:
self.progress_state.update(batch_info)
await update_crawl_progress(self.progress_id, self.progress_state)
storage_results = await self.doc_storage_ops.process_and_store_documents(
crawl_results,
request,
crawl_type,
original_source_id,
doc_storage_callback,
self._check_cancellation
self._check_cancellation,
)
# Check for cancellation after document storage
self._check_cancellation()
# Send heartbeat after document storage
await send_heartbeat_if_needed()
# Extract code examples if requested
code_examples_count = 0
if request.get('extract_code_examples', True):
await update_mapped_progress('code_extraction', 0, 'Starting code extraction...')
if request.get("extract_code_examples", True):
await update_mapped_progress("code_extraction", 0, "Starting code extraction...")
# Create progress callback for code extraction
async def code_progress_callback(data: dict):
if self.progress_id:
@ -369,156 +406,169 @@ class CrawlingService:
# Update progress state while preserving existing fields
self.progress_state.update(data)
await update_crawl_progress(self.progress_id, self.progress_state)
code_examples_count = await self.doc_storage_ops.extract_and_store_code_examples(
crawl_results,
storage_results['url_to_full_document'],
storage_results["url_to_full_document"],
code_progress_callback,
85,
95
95,
)
# Send heartbeat after code extraction
await send_heartbeat_if_needed()
# Finalization
await update_mapped_progress(
'finalization', 50, 'Finalizing crawl results...',
chunks_stored=storage_results['chunk_count'],
code_examples_found=code_examples_count
"finalization",
50,
"Finalizing crawl results...",
chunks_stored=storage_results["chunk_count"],
code_examples_found=code_examples_count,
)
# Complete - send both the progress update and completion event
await update_mapped_progress(
'completed', 100,
f'Crawl completed: {storage_results["chunk_count"]} chunks, {code_examples_count} code examples',
chunks_stored=storage_results['chunk_count'],
"completed",
100,
f"Crawl completed: {storage_results['chunk_count']} chunks, {code_examples_count} code examples",
chunks_stored=storage_results["chunk_count"],
code_examples_found=code_examples_count,
processed_pages=len(crawl_results),
total_pages=len(crawl_results)
total_pages=len(crawl_results),
)
# Also send the completion event that frontend expects
_ensure_socketio_imports()
await complete_crawl_progress(task_id, {
'chunks_stored': storage_results['chunk_count'],
'code_examples_found': code_examples_count,
'processed_pages': len(crawl_results),
'total_pages': len(crawl_results),
'sourceId': storage_results.get('source_id', ''),
'log': 'Crawl completed successfully!'
})
await complete_crawl_progress(
task_id,
{
"chunks_stored": storage_results["chunk_count"],
"code_examples_found": code_examples_count,
"processed_pages": len(crawl_results),
"total_pages": len(crawl_results),
"sourceId": storage_results.get("source_id", ""),
"log": "Crawl completed successfully!",
},
)
# Unregister after successful completion
if self.progress_id:
unregister_orchestration(self.progress_id)
safe_logfire_info(f"Unregistered orchestration service after completion | progress_id={self.progress_id}")
safe_logfire_info(
f"Unregistered orchestration service after completion | progress_id={self.progress_id}"
)
except asyncio.CancelledError:
safe_logfire_info(f"Crawl operation cancelled | progress_id={self.progress_id}")
await self._handle_progress_update(task_id, {
'status': 'cancelled',
'percentage': -1,
'log': 'Crawl operation was cancelled by user'
})
await self._handle_progress_update(
task_id,
{
"status": "cancelled",
"percentage": -1,
"log": "Crawl operation was cancelled by user",
},
)
# Unregister on cancellation
if self.progress_id:
unregister_orchestration(self.progress_id)
safe_logfire_info(f"Unregistered orchestration service on cancellation | progress_id={self.progress_id}")
safe_logfire_info(
f"Unregistered orchestration service on cancellation | progress_id={self.progress_id}"
)
except Exception as e:
safe_logfire_error(f"Async crawl orchestration failed | error={str(e)}")
await self._handle_progress_update(task_id, {
'status': 'error',
'percentage': -1,
'log': f'Crawl failed: {str(e)}'
})
await self._handle_progress_update(
task_id, {"status": "error", "percentage": -1, "log": f"Crawl failed: {str(e)}"}
)
# Unregister on error
if self.progress_id:
unregister_orchestration(self.progress_id)
safe_logfire_info(f"Unregistered orchestration service on error | progress_id={self.progress_id}")
safe_logfire_info(
f"Unregistered orchestration service on error | progress_id={self.progress_id}"
)
async def _crawl_by_url_type(self, url: str, request: Dict[str, Any]) -> tuple:
"""
Detect URL type and perform appropriate crawling.
Returns:
Tuple of (crawl_results, crawl_type)
"""
_ensure_socketio_imports()
crawl_results = []
crawl_type = None
if self.url_handler.is_txt(url):
# Handle text files
if self.progress_id:
self.progress_state.update({
'status': 'crawling',
'percentage': 10,
'log': 'Detected text file, fetching content...'
"status": "crawling",
"percentage": 10,
"log": "Detected text file, fetching content...",
})
await update_crawl_progress(self.progress_id, self.progress_state)
crawl_results = await self.crawl_markdown_file(
url,
progress_callback=await self._create_crawl_progress_callback('crawling'),
progress_callback=await self._create_crawl_progress_callback("crawling"),
start_progress=10,
end_progress=20
end_progress=20,
)
crawl_type = "text_file"
elif self.url_handler.is_sitemap(url):
# Handle sitemaps
if self.progress_id:
self.progress_state.update({
'status': 'crawling',
'percentage': 10,
'log': 'Detected sitemap, parsing URLs...'
"status": "crawling",
"percentage": 10,
"log": "Detected sitemap, parsing URLs...",
})
await update_crawl_progress(self.progress_id, self.progress_state)
sitemap_urls = self.parse_sitemap(url)
if sitemap_urls:
# Emit progress before starting batch crawl
if self.progress_id:
self.progress_state.update({
'status': 'crawling',
'percentage': 15,
'log': f'Starting batch crawl of {len(sitemap_urls)} URLs...'
"status": "crawling",
"percentage": 15,
"log": f"Starting batch crawl of {len(sitemap_urls)} URLs...",
})
await update_crawl_progress(self.progress_id, self.progress_state)
crawl_results = await self.crawl_batch_with_progress(
sitemap_urls,
progress_callback=await self._create_crawl_progress_callback('crawling'),
progress_callback=await self._create_crawl_progress_callback("crawling"),
start_progress=15,
end_progress=20
end_progress=20,
)
crawl_type = "sitemap"
else:
# Handle regular webpages with recursive crawling
if self.progress_id:
self.progress_state.update({
'status': 'crawling',
'percentage': 10,
'log': f'Starting recursive crawl with max depth {request.get("max_depth", 1)}...'
"status": "crawling",
"percentage": 10,
"log": f"Starting recursive crawl with max depth {request.get('max_depth', 1)}...",
})
await update_crawl_progress(self.progress_id, self.progress_state)
max_depth = request.get('max_depth', 1)
# Limit concurrent crawls for better performance
max_concurrent = 20 if self.site_config.is_documentation_site(url) else 10
max_depth = request.get("max_depth", 1)
# Let the strategy handle concurrency from settings
# This will use CRAWL_MAX_CONCURRENT from database (default: 10)
crawl_results = await self.crawl_recursive_with_progress(
[url],
max_depth=max_depth,
max_concurrent=max_concurrent,
progress_callback=await self._create_crawl_progress_callback('crawling'),
max_concurrent=None, # Let strategy use settings
progress_callback=await self._create_crawl_progress_callback("crawling"),
start_progress=10,
end_progress=20
end_progress=20,
)
crawl_type = "webpage"
return crawl_results, crawl_type

View File

@ -3,6 +3,7 @@ Batch Crawling Strategy
Handles batch crawling of multiple URLs in parallel.
"""
import asyncio
from typing import List, Dict, Any, Optional, Callable
@ -15,18 +16,18 @@ logger = get_logger(__name__)
class BatchCrawlStrategy:
"""Strategy for crawling multiple URLs in batch."""
def __init__(self, crawler, markdown_generator):
"""
Initialize batch crawl strategy.
Args:
crawler (AsyncWebCrawler): The Crawl4AI crawler instance for web crawling operations
markdown_generator (DefaultMarkdownGenerator): The markdown generator instance for converting HTML to markdown
"""
self.crawler = crawler
self.markdown_generator = markdown_generator
async def crawl_batch_with_progress(
self,
urls: List[str],
@ -35,11 +36,11 @@ class BatchCrawlStrategy:
max_concurrent: int = None,
progress_callback: Optional[Callable] = None,
start_progress: int = 15,
end_progress: int = 60
end_progress: int = 60,
) -> List[Dict[str, Any]]:
"""
Batch crawl multiple URLs in parallel with progress reporting.
Args:
urls: List of URLs to crawl
transform_url_func: Function to transform URLs (e.g., GitHub URLs)
@ -48,16 +49,16 @@ class BatchCrawlStrategy:
progress_callback: Optional callback for progress updates
start_progress: Starting progress percentage
end_progress: Ending progress percentage
Returns:
List of crawl results
"""
if not self.crawler:
logger.error("No crawler instance available for batch crawling")
if progress_callback:
await progress_callback('error', 0, 'Crawler not available')
await progress_callback("error", 0, "Crawler not available")
return []
# Load settings from database first
try:
settings = await credential_service.get_credentials_by_category("rag_strategy")
@ -74,10 +75,10 @@ class BatchCrawlStrategy:
memory_threshold = 80.0
check_interval = 0.5
settings = {} # Empty dict for defaults
# Check if any URLs are documentation sites
has_doc_sites = any(is_documentation_site_func(url) for url in urls)
if has_doc_sites:
logger.info("Detected documentation sites in batch, using enhanced configuration")
# Use generic documentation selectors for batch crawling
@ -85,7 +86,7 @@ class BatchCrawlStrategy:
cache_mode=CacheMode.BYPASS,
stream=True, # Enable streaming for faster parallel processing
markdown_generator=self.markdown_generator,
wait_for='body', # Simple selector for batch
wait_for="body", # Simple selector for batch
wait_until=settings.get("CRAWL_WAIT_STRATEGY", "domcontentloaded"),
page_timeout=int(settings.get("CRAWL_PAGE_TIMEOUT", "30000")),
delay_before_return_html=float(settings.get("CRAWL_DELAY_BEFORE_HTML", "1.0")),
@ -93,7 +94,7 @@ class BatchCrawlStrategy:
scan_full_page=True, # Trigger lazy loading
exclude_all_images=False,
remove_overlay_elements=True,
process_iframes=True
process_iframes=True,
)
else:
# Configuration for regular batch crawling
@ -104,27 +105,27 @@ class BatchCrawlStrategy:
wait_until=settings.get("CRAWL_WAIT_STRATEGY", "domcontentloaded"),
page_timeout=int(settings.get("CRAWL_PAGE_TIMEOUT", "45000")),
delay_before_return_html=float(settings.get("CRAWL_DELAY_BEFORE_HTML", "0.5")),
scan_full_page=True
scan_full_page=True,
)
dispatcher = MemoryAdaptiveDispatcher(
memory_threshold_percent=memory_threshold,
check_interval=check_interval,
max_session_permit=max_concurrent
max_session_permit=max_concurrent,
)
async def report_progress(percentage: int, message: str):
"""Helper to report progress if callback is available"""
if progress_callback:
await progress_callback('crawling', percentage, message)
await progress_callback("crawling", percentage, message)
total_urls = len(urls)
await report_progress(start_progress, f'Starting to crawl {total_urls} URLs...')
await report_progress(start_progress, f"Starting to crawl {total_urls} URLs...")
# Use configured batch size
successful_results = []
processed = 0
# Transform all URLs at the beginning
url_mapping = {} # Map transformed URLs back to original
transformed_urls = []
@ -132,20 +133,29 @@ class BatchCrawlStrategy:
transformed = transform_url_func(url)
transformed_urls.append(transformed)
url_mapping[transformed] = url
for i in range(0, total_urls, batch_size):
batch_urls = transformed_urls[i:i + batch_size]
batch_urls = transformed_urls[i : i + batch_size]
batch_start = i
batch_end = min(i + batch_size, total_urls)
# Report batch start with smooth progress
progress_percentage = start_progress + int((i / total_urls) * (end_progress - start_progress))
await report_progress(progress_percentage, f'Processing batch {batch_start+1}-{batch_end} of {total_urls} URLs...')
progress_percentage = start_progress + int(
(i / total_urls) * (end_progress - start_progress)
)
await report_progress(
progress_percentage,
f"Processing batch {batch_start + 1}-{batch_end} of {total_urls} URLs...",
)
# Crawl this batch using arun_many with streaming
logger.info(f"Starting parallel crawl of batch {batch_start+1}-{batch_end} ({len(batch_urls)} URLs)")
batch_results = await self.crawler.arun_many(urls=batch_urls, config=crawl_config, dispatcher=dispatcher)
logger.info(
f"Starting parallel crawl of batch {batch_start + 1}-{batch_end} ({len(batch_urls)} URLs)"
)
batch_results = await self.crawler.arun_many(
urls=batch_urls, config=crawl_config, dispatcher=dispatcher
)
# Handle streaming results
j = 0
async for result in batch_results:
@ -154,19 +164,31 @@ class BatchCrawlStrategy:
# Map back to original URL
original_url = url_mapping.get(result.url, result.url)
successful_results.append({
'url': original_url,
'markdown': result.markdown,
'html': result.html # Use raw HTML
"url": original_url,
"markdown": result.markdown,
"html": result.html, # Use raw HTML
})
else:
logger.warning(f"Failed to crawl {result.url}: {getattr(result, 'error_message', 'Unknown error')}")
logger.warning(
f"Failed to crawl {result.url}: {getattr(result, 'error_message', 'Unknown error')}"
)
# Report individual URL progress with smooth increments
progress_percentage = start_progress + int((processed / total_urls) * (end_progress - start_progress))
progress_percentage = start_progress + int(
(processed / total_urls) * (end_progress - start_progress)
)
# Report more frequently for smoother progress
if processed % 5 == 0 or processed == total_urls: # Report every 5 URLs or at the end
await report_progress(progress_percentage, f'Crawled {processed}/{total_urls} pages ({len(successful_results)} successful)')
if (
processed % 5 == 0 or processed == total_urls
): # Report every 5 URLs or at the end
await report_progress(
progress_percentage,
f"Crawled {processed}/{total_urls} pages ({len(successful_results)} successful)",
)
j += 1
await report_progress(end_progress, f'Batch crawling completed: {len(successful_results)}/{total_urls} pages successful')
return successful_results
await report_progress(
end_progress,
f"Batch crawling completed: {len(successful_results)}/{total_urls} pages successful",
)
return successful_results