Merge pull request #213 from coleam00/fix/consolidate-concurrency-settings

Fix crawler concurrency configuration to prevent memory crashes
This commit is contained in:
Wirasm 2025-08-16 00:38:45 +03:00 committed by GitHub
commit f96a9a4c4a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 312 additions and 225 deletions

View File

@ -33,4 +33,9 @@ EMBEDDING_DIMENSIONS=1536
# - OPENAI_API_KEY (encrypted)
# - MODEL_CHOICE
# - TRANSPORT settings
# - RAG strategy flags (USE_CONTEXTUAL_EMBEDDINGS, USE_HYBRID_SEARCH, etc.)
# - RAG strategy flags (USE_CONTEXTUAL_EMBEDDINGS, USE_HYBRID_SEARCH, etc.)
# - Crawler settings:
# * CRAWL_MAX_CONCURRENT (default: 10) - Max concurrent pages per crawl operation
# * CRAWL_BATCH_SIZE (default: 50) - URLs processed per batch
# * MEMORY_THRESHOLD_PERCENT (default: 80) - Memory % before throttling
# * DISPATCHER_CHECK_INTERVAL (default: 0.5) - Memory check interval in seconds

View File

@ -19,14 +19,20 @@ from ...utils import get_supabase_client
update_crawl_progress = None
complete_crawl_progress = None
def _ensure_socketio_imports():
"""Ensure socket.IO handlers are imported."""
global update_crawl_progress, complete_crawl_progress
if update_crawl_progress is None:
from ...api_routes.socketio_handlers import update_crawl_progress as _update, complete_crawl_progress as _complete
from ...api_routes.socketio_handlers import (
update_crawl_progress as _update,
complete_crawl_progress as _complete,
)
update_crawl_progress = _update
complete_crawl_progress = _complete
# Import strategies
from .strategies.batch import BatchCrawlStrategy
from .strategies.recursive import RecursiveCrawlStrategy
@ -44,15 +50,15 @@ from .progress_mapper import ProgressMapper
logger = get_logger(__name__)
# Global registry to track active orchestration services for cancellation support
_active_orchestrations: Dict[str, 'CrawlingService'] = {}
_active_orchestrations: Dict[str, "CrawlingService"] = {}
def get_active_orchestration(progress_id: str) -> Optional['CrawlingService']:
def get_active_orchestration(progress_id: str) -> Optional["CrawlingService"]:
"""Get an active orchestration service by progress ID."""
return _active_orchestrations.get(progress_id)
def register_orchestration(progress_id: str, orchestration: 'CrawlingService'):
def register_orchestration(progress_id: str, orchestration: "CrawlingService"):
"""Register an active orchestration service."""
_active_orchestrations[progress_id] = orchestration
@ -68,11 +74,11 @@ class CrawlingService:
Service class for web crawling and orchestration operations.
Combines functionality from both CrawlingService and CrawlOrchestrationService.
"""
def __init__(self, crawler=None, supabase_client=None, progress_id=None):
"""
Initialize the crawling service.
Args:
crawler: The Crawl4AI crawler instance
supabase_client: The Supabase client for database operations
@ -81,92 +87,97 @@ class CrawlingService:
self.crawler = crawler
self.supabase_client = supabase_client or get_supabase_client()
self.progress_id = progress_id
# Initialize helpers
self.url_handler = URLHandler()
self.site_config = SiteConfig()
self.markdown_generator = self.site_config.get_markdown_generator()
# Initialize strategies
self.batch_strategy = BatchCrawlStrategy(crawler, self.markdown_generator)
self.recursive_strategy = RecursiveCrawlStrategy(crawler, self.markdown_generator)
self.single_page_strategy = SinglePageCrawlStrategy(crawler, self.markdown_generator)
self.sitemap_strategy = SitemapCrawlStrategy()
# Initialize operations
self.doc_storage_ops = DocumentStorageOperations(self.supabase_client)
# Track progress state across all stages to prevent UI resets
self.progress_state = {'progressId': self.progress_id} if self.progress_id else {}
self.progress_state = {"progressId": self.progress_id} if self.progress_id else {}
# Initialize progress mapper to prevent backwards jumps
self.progress_mapper = ProgressMapper()
# Cancellation support
self._cancelled = False
def set_progress_id(self, progress_id: str):
"""Set the progress ID for Socket.IO updates."""
self.progress_id = progress_id
if self.progress_id:
self.progress_state = {'progressId': self.progress_id}
self.progress_state = {"progressId": self.progress_id}
def cancel(self):
"""Cancel the crawl operation."""
self._cancelled = True
safe_logfire_info(f"Crawl operation cancelled | progress_id={self.progress_id}")
def is_cancelled(self) -> bool:
"""Check if the crawl operation has been cancelled."""
return self._cancelled
def _check_cancellation(self):
"""Check if cancelled and raise an exception if so."""
if self._cancelled:
raise asyncio.CancelledError("Crawl operation was cancelled by user")
async def _create_crawl_progress_callback(self, base_status: str) -> Callable[[str, int, str], Awaitable[None]]:
async def _create_crawl_progress_callback(
self, base_status: str
) -> Callable[[str, int, str], Awaitable[None]]:
"""Create a progress callback for crawling operations.
Args:
base_status: The base status to use for progress updates
Returns:
Async callback function with signature (status: str, percentage: int, message: str, **kwargs) -> None
"""
_ensure_socketio_imports()
async def callback(status: str, percentage: int, message: str, **kwargs):
if self.progress_id:
# Update and preserve progress state
self.progress_state.update({
'status': base_status,
'percentage': percentage,
'log': message,
**kwargs
"status": base_status,
"percentage": percentage,
"log": message,
**kwargs,
})
safe_logfire_info(f"Emitting crawl progress | progress_id={self.progress_id} | status={base_status} | percentage={percentage}")
safe_logfire_info(
f"Emitting crawl progress | progress_id={self.progress_id} | status={base_status} | percentage={percentage}"
)
await update_crawl_progress(self.progress_id, self.progress_state)
return callback
async def _handle_progress_update(self, task_id: str, update: Dict[str, Any]) -> None:
"""
Handle progress updates from background task.
Args:
task_id: The task ID for the progress update
update: Dictionary containing progress update data
"""
_ensure_socketio_imports()
if self.progress_id:
# Update and preserve progress state
self.progress_state.update(update)
# Ensure progressId is always included
if self.progress_id and 'progressId' not in self.progress_state:
self.progress_state['progressId'] = self.progress_id
if self.progress_id and "progressId" not in self.progress_state:
self.progress_state["progressId"] = self.progress_id
# Always emit progress updates for real-time feedback
await update_crawl_progress(self.progress_id, self.progress_state)
# Simple delegation methods for backward compatibility
async def crawl_single_page(self, url: str, retry_count: int = 3) -> Dict[str, Any]:
"""Crawl a single web page."""
@ -174,27 +185,33 @@ class CrawlingService:
url,
self.url_handler.transform_github_url,
self.site_config.is_documentation_site,
retry_count
retry_count,
)
async def crawl_markdown_file(self, url: str, progress_callback=None,
start_progress: int = 10, end_progress: int = 20) -> List[Dict[str, Any]]:
async def crawl_markdown_file(
self, url: str, progress_callback=None, start_progress: int = 10, end_progress: int = 20
) -> List[Dict[str, Any]]:
"""Crawl a .txt or markdown file."""
return await self.single_page_strategy.crawl_markdown_file(
url,
self.url_handler.transform_github_url,
progress_callback,
start_progress,
end_progress
end_progress,
)
def parse_sitemap(self, sitemap_url: str) -> List[str]:
"""Parse a sitemap and extract URLs."""
return self.sitemap_strategy.parse_sitemap(sitemap_url)
async def crawl_batch_with_progress(self, urls: List[str], max_concurrent: int = None,
progress_callback=None, start_progress: int = 15,
end_progress: int = 60) -> List[Dict[str, Any]]:
async def crawl_batch_with_progress(
self,
urls: List[str],
max_concurrent: int = None,
progress_callback=None,
start_progress: int = 15,
end_progress: int = 60,
) -> List[Dict[str, Any]]:
"""Batch crawl multiple URLs in parallel."""
return await self.batch_strategy.crawl_batch_with_progress(
urls,
@ -203,12 +220,18 @@ class CrawlingService:
max_concurrent,
progress_callback,
start_progress,
end_progress
end_progress,
)
async def crawl_recursive_with_progress(self, start_urls: List[str], max_depth: int = 3,
max_concurrent: int = None, progress_callback=None,
start_progress: int = 10, end_progress: int = 60) -> List[Dict[str, Any]]:
async def crawl_recursive_with_progress(
self,
start_urls: List[str],
max_depth: int = 3,
max_concurrent: int = None,
progress_callback=None,
start_progress: int = 10,
end_progress: int = 60,
) -> List[Dict[str, Any]]:
"""Recursively crawl internal links from start URLs."""
return await self.recursive_strategy.crawl_recursive_with_progress(
start_urls,
@ -218,150 +241,164 @@ class CrawlingService:
max_concurrent,
progress_callback,
start_progress,
end_progress
end_progress,
)
# Orchestration methods
async def orchestrate_crawl(self, request: Dict[str, Any]) -> Dict[str, Any]:
"""
Main orchestration method - non-blocking using asyncio.create_task.
Args:
request: The crawl request containing url, knowledge_type, tags, max_depth, etc.
Returns:
Dict containing task_id and status
"""
url = str(request.get('url', ''))
url = str(request.get("url", ""))
safe_logfire_info(f"Starting background crawl orchestration | url={url}")
# Create task ID
task_id = self.progress_id or str(uuid.uuid4())
# Register this orchestration service for cancellation support
if self.progress_id:
register_orchestration(self.progress_id, self)
# Start the crawl as an async task in the main event loop
asyncio.create_task(self._async_orchestrate_crawl(request, task_id))
# Return immediately
return {
"task_id": task_id,
"status": "started",
"message": f"Crawl operation started for {url}",
"progress_id": self.progress_id
"progress_id": self.progress_id,
}
async def _async_orchestrate_crawl(self, request: Dict[str, Any], task_id: str):
"""
Async orchestration that runs in the main event loop.
"""
last_heartbeat = asyncio.get_event_loop().time()
heartbeat_interval = 30.0 # Send heartbeat every 30 seconds
async def send_heartbeat_if_needed():
"""Send heartbeat to keep Socket.IO connection alive"""
nonlocal last_heartbeat
current_time = asyncio.get_event_loop().time()
if current_time - last_heartbeat >= heartbeat_interval:
await self._handle_progress_update(task_id, {
'status': self.progress_mapper.get_current_stage(),
'percentage': self.progress_mapper.get_current_progress(),
'heartbeat': True,
'log': 'Background task still running...',
'message': 'Processing...'
})
await self._handle_progress_update(
task_id,
{
"status": self.progress_mapper.get_current_stage(),
"percentage": self.progress_mapper.get_current_progress(),
"heartbeat": True,
"log": "Background task still running...",
"message": "Processing...",
},
)
last_heartbeat = current_time
try:
url = str(request.get('url', ''))
url = str(request.get("url", ""))
safe_logfire_info(f"Starting async crawl orchestration | url={url} | task_id={task_id}")
# Extract source_id from the original URL
parsed_original_url = urlparse(url)
original_source_id = parsed_original_url.netloc or parsed_original_url.path
safe_logfire_info(f"Using source_id '{original_source_id}' from original URL '{url}'")
# Helper to update progress with mapper
async def update_mapped_progress(stage: str, stage_progress: int, message: str, **kwargs):
async def update_mapped_progress(
stage: str, stage_progress: int, message: str, **kwargs
):
overall_progress = self.progress_mapper.map_progress(stage, stage_progress)
await self._handle_progress_update(task_id, {
'status': stage,
'percentage': overall_progress,
'log': message,
'message': message,
**kwargs
})
await self._handle_progress_update(
task_id,
{
"status": stage,
"percentage": overall_progress,
"log": message,
"message": message,
**kwargs,
},
)
# Initial progress
await update_mapped_progress('starting', 100, f'Starting crawl of {url}', currentUrl=url)
await update_mapped_progress(
"starting", 100, f"Starting crawl of {url}", currentUrl=url
)
# Check for cancellation before proceeding
self._check_cancellation()
# Analyzing stage
await update_mapped_progress('analyzing', 50, f'Analyzing URL type for {url}')
await update_mapped_progress("analyzing", 50, f"Analyzing URL type for {url}")
# Detect URL type and perform crawl
crawl_results, crawl_type = await self._crawl_by_url_type(url, request)
# Check for cancellation after crawling
self._check_cancellation()
# Send heartbeat after potentially long crawl operation
await send_heartbeat_if_needed()
if not crawl_results:
raise ValueError("No content was crawled from the provided URL")
# Processing stage
await update_mapped_progress('processing', 50, 'Processing crawled content')
await update_mapped_progress("processing", 50, "Processing crawled content")
# Check for cancellation before document processing
self._check_cancellation()
# Process and store documents using document storage operations
async def doc_storage_callback(message: str, percentage: int, batch_info: Optional[dict] = None):
async def doc_storage_callback(
message: str, percentage: int, batch_info: Optional[dict] = None
):
if self.progress_id:
_ensure_socketio_imports()
# Map percentage to document storage range (20-85%)
mapped_percentage = 20 + int((percentage / 100) * (85 - 20))
safe_logfire_info(f"Document storage progress mapping: {percentage}% -> {mapped_percentage}%")
safe_logfire_info(
f"Document storage progress mapping: {percentage}% -> {mapped_percentage}%"
)
# Update progress state while preserving existing fields
self.progress_state.update({
'status': 'document_storage',
'percentage': mapped_percentage,
'log': message
"status": "document_storage",
"percentage": mapped_percentage,
"log": message,
})
# Add batch_info fields if provided
if batch_info:
self.progress_state.update(batch_info)
await update_crawl_progress(self.progress_id, self.progress_state)
storage_results = await self.doc_storage_ops.process_and_store_documents(
crawl_results,
request,
crawl_type,
original_source_id,
doc_storage_callback,
self._check_cancellation
self._check_cancellation,
)
# Check for cancellation after document storage
self._check_cancellation()
# Send heartbeat after document storage
await send_heartbeat_if_needed()
# Extract code examples if requested
code_examples_count = 0
if request.get('extract_code_examples', True):
await update_mapped_progress('code_extraction', 0, 'Starting code extraction...')
if request.get("extract_code_examples", True):
await update_mapped_progress("code_extraction", 0, "Starting code extraction...")
# Create progress callback for code extraction
async def code_progress_callback(data: dict):
if self.progress_id:
@ -369,156 +406,169 @@ class CrawlingService:
# Update progress state while preserving existing fields
self.progress_state.update(data)
await update_crawl_progress(self.progress_id, self.progress_state)
code_examples_count = await self.doc_storage_ops.extract_and_store_code_examples(
crawl_results,
storage_results['url_to_full_document'],
storage_results["url_to_full_document"],
code_progress_callback,
85,
95
95,
)
# Send heartbeat after code extraction
await send_heartbeat_if_needed()
# Finalization
await update_mapped_progress(
'finalization', 50, 'Finalizing crawl results...',
chunks_stored=storage_results['chunk_count'],
code_examples_found=code_examples_count
"finalization",
50,
"Finalizing crawl results...",
chunks_stored=storage_results["chunk_count"],
code_examples_found=code_examples_count,
)
# Complete - send both the progress update and completion event
await update_mapped_progress(
'completed', 100,
f'Crawl completed: {storage_results["chunk_count"]} chunks, {code_examples_count} code examples',
chunks_stored=storage_results['chunk_count'],
"completed",
100,
f"Crawl completed: {storage_results['chunk_count']} chunks, {code_examples_count} code examples",
chunks_stored=storage_results["chunk_count"],
code_examples_found=code_examples_count,
processed_pages=len(crawl_results),
total_pages=len(crawl_results)
total_pages=len(crawl_results),
)
# Also send the completion event that frontend expects
_ensure_socketio_imports()
await complete_crawl_progress(task_id, {
'chunks_stored': storage_results['chunk_count'],
'code_examples_found': code_examples_count,
'processed_pages': len(crawl_results),
'total_pages': len(crawl_results),
'sourceId': storage_results.get('source_id', ''),
'log': 'Crawl completed successfully!'
})
await complete_crawl_progress(
task_id,
{
"chunks_stored": storage_results["chunk_count"],
"code_examples_found": code_examples_count,
"processed_pages": len(crawl_results),
"total_pages": len(crawl_results),
"sourceId": storage_results.get("source_id", ""),
"log": "Crawl completed successfully!",
},
)
# Unregister after successful completion
if self.progress_id:
unregister_orchestration(self.progress_id)
safe_logfire_info(f"Unregistered orchestration service after completion | progress_id={self.progress_id}")
safe_logfire_info(
f"Unregistered orchestration service after completion | progress_id={self.progress_id}"
)
except asyncio.CancelledError:
safe_logfire_info(f"Crawl operation cancelled | progress_id={self.progress_id}")
await self._handle_progress_update(task_id, {
'status': 'cancelled',
'percentage': -1,
'log': 'Crawl operation was cancelled by user'
})
await self._handle_progress_update(
task_id,
{
"status": "cancelled",
"percentage": -1,
"log": "Crawl operation was cancelled by user",
},
)
# Unregister on cancellation
if self.progress_id:
unregister_orchestration(self.progress_id)
safe_logfire_info(f"Unregistered orchestration service on cancellation | progress_id={self.progress_id}")
safe_logfire_info(
f"Unregistered orchestration service on cancellation | progress_id={self.progress_id}"
)
except Exception as e:
safe_logfire_error(f"Async crawl orchestration failed | error={str(e)}")
await self._handle_progress_update(task_id, {
'status': 'error',
'percentage': -1,
'log': f'Crawl failed: {str(e)}'
})
await self._handle_progress_update(
task_id, {"status": "error", "percentage": -1, "log": f"Crawl failed: {str(e)}"}
)
# Unregister on error
if self.progress_id:
unregister_orchestration(self.progress_id)
safe_logfire_info(f"Unregistered orchestration service on error | progress_id={self.progress_id}")
safe_logfire_info(
f"Unregistered orchestration service on error | progress_id={self.progress_id}"
)
async def _crawl_by_url_type(self, url: str, request: Dict[str, Any]) -> tuple:
"""
Detect URL type and perform appropriate crawling.
Returns:
Tuple of (crawl_results, crawl_type)
"""
_ensure_socketio_imports()
crawl_results = []
crawl_type = None
if self.url_handler.is_txt(url):
# Handle text files
if self.progress_id:
self.progress_state.update({
'status': 'crawling',
'percentage': 10,
'log': 'Detected text file, fetching content...'
"status": "crawling",
"percentage": 10,
"log": "Detected text file, fetching content...",
})
await update_crawl_progress(self.progress_id, self.progress_state)
crawl_results = await self.crawl_markdown_file(
url,
progress_callback=await self._create_crawl_progress_callback('crawling'),
progress_callback=await self._create_crawl_progress_callback("crawling"),
start_progress=10,
end_progress=20
end_progress=20,
)
crawl_type = "text_file"
elif self.url_handler.is_sitemap(url):
# Handle sitemaps
if self.progress_id:
self.progress_state.update({
'status': 'crawling',
'percentage': 10,
'log': 'Detected sitemap, parsing URLs...'
"status": "crawling",
"percentage": 10,
"log": "Detected sitemap, parsing URLs...",
})
await update_crawl_progress(self.progress_id, self.progress_state)
sitemap_urls = self.parse_sitemap(url)
if sitemap_urls:
# Emit progress before starting batch crawl
if self.progress_id:
self.progress_state.update({
'status': 'crawling',
'percentage': 15,
'log': f'Starting batch crawl of {len(sitemap_urls)} URLs...'
"status": "crawling",
"percentage": 15,
"log": f"Starting batch crawl of {len(sitemap_urls)} URLs...",
})
await update_crawl_progress(self.progress_id, self.progress_state)
crawl_results = await self.crawl_batch_with_progress(
sitemap_urls,
progress_callback=await self._create_crawl_progress_callback('crawling'),
progress_callback=await self._create_crawl_progress_callback("crawling"),
start_progress=15,
end_progress=20
end_progress=20,
)
crawl_type = "sitemap"
else:
# Handle regular webpages with recursive crawling
if self.progress_id:
self.progress_state.update({
'status': 'crawling',
'percentage': 10,
'log': f'Starting recursive crawl with max depth {request.get("max_depth", 1)}...'
"status": "crawling",
"percentage": 10,
"log": f"Starting recursive crawl with max depth {request.get('max_depth', 1)}...",
})
await update_crawl_progress(self.progress_id, self.progress_state)
max_depth = request.get('max_depth', 1)
# Limit concurrent crawls for better performance
max_concurrent = 20 if self.site_config.is_documentation_site(url) else 10
max_depth = request.get("max_depth", 1)
# Let the strategy handle concurrency from settings
# This will use CRAWL_MAX_CONCURRENT from database (default: 10)
crawl_results = await self.crawl_recursive_with_progress(
[url],
max_depth=max_depth,
max_concurrent=max_concurrent,
progress_callback=await self._create_crawl_progress_callback('crawling'),
max_concurrent=None, # Let strategy use settings
progress_callback=await self._create_crawl_progress_callback("crawling"),
start_progress=10,
end_progress=20
end_progress=20,
)
crawl_type = "webpage"
return crawl_results, crawl_type

View File

@ -3,6 +3,7 @@ Batch Crawling Strategy
Handles batch crawling of multiple URLs in parallel.
"""
import asyncio
from typing import List, Dict, Any, Optional, Callable
@ -15,18 +16,18 @@ logger = get_logger(__name__)
class BatchCrawlStrategy:
"""Strategy for crawling multiple URLs in batch."""
def __init__(self, crawler, markdown_generator):
"""
Initialize batch crawl strategy.
Args:
crawler (AsyncWebCrawler): The Crawl4AI crawler instance for web crawling operations
markdown_generator (DefaultMarkdownGenerator): The markdown generator instance for converting HTML to markdown
"""
self.crawler = crawler
self.markdown_generator = markdown_generator
async def crawl_batch_with_progress(
self,
urls: List[str],
@ -35,11 +36,11 @@ class BatchCrawlStrategy:
max_concurrent: int = None,
progress_callback: Optional[Callable] = None,
start_progress: int = 15,
end_progress: int = 60
end_progress: int = 60,
) -> List[Dict[str, Any]]:
"""
Batch crawl multiple URLs in parallel with progress reporting.
Args:
urls: List of URLs to crawl
transform_url_func: Function to transform URLs (e.g., GitHub URLs)
@ -48,17 +49,17 @@ class BatchCrawlStrategy:
progress_callback: Optional callback for progress updates
start_progress: Starting progress percentage
end_progress: Ending progress percentage
Returns:
List of crawl results
"""
if not self.crawler:
logger.error("No crawler instance available for batch crawling")
if progress_callback:
await progress_callback('error', 0, 'Crawler not available')
await progress_callback("error", 0, "Crawler not available")
return []
# Load settings from database first
# Load settings from database - fail fast on configuration errors
try:
settings = await credential_service.get_credentials_by_category("rag_strategy")
batch_size = int(settings.get("CRAWL_BATCH_SIZE", "50"))
@ -66,18 +67,23 @@ class BatchCrawlStrategy:
max_concurrent = int(settings.get("CRAWL_MAX_CONCURRENT", "10"))
memory_threshold = float(settings.get("MEMORY_THRESHOLD_PERCENT", "80"))
check_interval = float(settings.get("DISPATCHER_CHECK_INTERVAL", "0.5"))
except (ValueError, KeyError, TypeError) as e:
# Critical configuration errors should fail fast in alpha
logger.error(f"Invalid crawl settings format: {e}", exc_info=True)
raise ValueError(f"Failed to load crawler configuration: {e}")
except Exception as e:
logger.warning(f"Failed to load crawl settings: {e}, using defaults")
# For non-critical errors (e.g., network issues), use defaults but log prominently
logger.error(f"Failed to load crawl settings from database: {e}, using defaults", exc_info=True)
batch_size = 50
if max_concurrent is None:
max_concurrent = 10
max_concurrent = 10 # Safe default to prevent memory issues
memory_threshold = 80.0
check_interval = 0.5
settings = {} # Empty dict for defaults
# Check if any URLs are documentation sites
has_doc_sites = any(is_documentation_site_func(url) for url in urls)
if has_doc_sites:
logger.info("Detected documentation sites in batch, using enhanced configuration")
# Use generic documentation selectors for batch crawling
@ -85,7 +91,7 @@ class BatchCrawlStrategy:
cache_mode=CacheMode.BYPASS,
stream=True, # Enable streaming for faster parallel processing
markdown_generator=self.markdown_generator,
wait_for='body', # Simple selector for batch
wait_for="body", # Simple selector for batch
wait_until=settings.get("CRAWL_WAIT_STRATEGY", "domcontentloaded"),
page_timeout=int(settings.get("CRAWL_PAGE_TIMEOUT", "30000")),
delay_before_return_html=float(settings.get("CRAWL_DELAY_BEFORE_HTML", "1.0")),
@ -93,7 +99,7 @@ class BatchCrawlStrategy:
scan_full_page=True, # Trigger lazy loading
exclude_all_images=False,
remove_overlay_elements=True,
process_iframes=True
process_iframes=True,
)
else:
# Configuration for regular batch crawling
@ -104,27 +110,27 @@ class BatchCrawlStrategy:
wait_until=settings.get("CRAWL_WAIT_STRATEGY", "domcontentloaded"),
page_timeout=int(settings.get("CRAWL_PAGE_TIMEOUT", "45000")),
delay_before_return_html=float(settings.get("CRAWL_DELAY_BEFORE_HTML", "0.5")),
scan_full_page=True
scan_full_page=True,
)
dispatcher = MemoryAdaptiveDispatcher(
memory_threshold_percent=memory_threshold,
check_interval=check_interval,
max_session_permit=max_concurrent
max_session_permit=max_concurrent,
)
async def report_progress(percentage: int, message: str):
"""Helper to report progress if callback is available"""
if progress_callback:
await progress_callback('crawling', percentage, message)
await progress_callback("crawling", percentage, message)
total_urls = len(urls)
await report_progress(start_progress, f'Starting to crawl {total_urls} URLs...')
await report_progress(start_progress, f"Starting to crawl {total_urls} URLs...")
# Use configured batch size
successful_results = []
processed = 0
# Transform all URLs at the beginning
url_mapping = {} # Map transformed URLs back to original
transformed_urls = []
@ -132,20 +138,29 @@ class BatchCrawlStrategy:
transformed = transform_url_func(url)
transformed_urls.append(transformed)
url_mapping[transformed] = url
for i in range(0, total_urls, batch_size):
batch_urls = transformed_urls[i:i + batch_size]
batch_urls = transformed_urls[i : i + batch_size]
batch_start = i
batch_end = min(i + batch_size, total_urls)
# Report batch start with smooth progress
progress_percentage = start_progress + int((i / total_urls) * (end_progress - start_progress))
await report_progress(progress_percentage, f'Processing batch {batch_start+1}-{batch_end} of {total_urls} URLs...')
progress_percentage = start_progress + int(
(i / total_urls) * (end_progress - start_progress)
)
await report_progress(
progress_percentage,
f"Processing batch {batch_start + 1}-{batch_end} of {total_urls} URLs...",
)
# Crawl this batch using arun_many with streaming
logger.info(f"Starting parallel crawl of batch {batch_start+1}-{batch_end} ({len(batch_urls)} URLs)")
batch_results = await self.crawler.arun_many(urls=batch_urls, config=crawl_config, dispatcher=dispatcher)
logger.info(
f"Starting parallel crawl of batch {batch_start + 1}-{batch_end} ({len(batch_urls)} URLs)"
)
batch_results = await self.crawler.arun_many(
urls=batch_urls, config=crawl_config, dispatcher=dispatcher
)
# Handle streaming results
j = 0
async for result in batch_results:
@ -154,19 +169,31 @@ class BatchCrawlStrategy:
# Map back to original URL
original_url = url_mapping.get(result.url, result.url)
successful_results.append({
'url': original_url,
'markdown': result.markdown,
'html': result.html # Use raw HTML
"url": original_url,
"markdown": result.markdown,
"html": result.html, # Use raw HTML
})
else:
logger.warning(f"Failed to crawl {result.url}: {getattr(result, 'error_message', 'Unknown error')}")
logger.warning(
f"Failed to crawl {result.url}: {getattr(result, 'error_message', 'Unknown error')}"
)
# Report individual URL progress with smooth increments
progress_percentage = start_progress + int((processed / total_urls) * (end_progress - start_progress))
progress_percentage = start_progress + int(
(processed / total_urls) * (end_progress - start_progress)
)
# Report more frequently for smoother progress
if processed % 5 == 0 or processed == total_urls: # Report every 5 URLs or at the end
await report_progress(progress_percentage, f'Crawled {processed}/{total_urls} pages ({len(successful_results)} successful)')
if (
processed % 5 == 0 or processed == total_urls
): # Report every 5 URLs or at the end
await report_progress(
progress_percentage,
f"Crawled {processed}/{total_urls} pages ({len(successful_results)} successful)",
)
j += 1
await report_progress(end_progress, f'Batch crawling completed: {len(successful_results)}/{total_urls} pages successful')
return successful_results
await report_progress(
end_progress,
f"Batch crawling completed: {len(successful_results)}/{total_urls} pages successful",
)
return successful_results

View File

@ -61,7 +61,7 @@ class RecursiveCrawlStrategy:
await progress_callback('error', 0, 'Crawler not available')
return []
# Load settings from database
# Load settings from database - fail fast on configuration errors
try:
settings = await credential_service.get_credentials_by_category("rag_strategy")
batch_size = int(settings.get("CRAWL_BATCH_SIZE", "50"))
@ -69,11 +69,16 @@ class RecursiveCrawlStrategy:
max_concurrent = int(settings.get("CRAWL_MAX_CONCURRENT", "10"))
memory_threshold = float(settings.get("MEMORY_THRESHOLD_PERCENT", "80"))
check_interval = float(settings.get("DISPATCHER_CHECK_INTERVAL", "0.5"))
except (ValueError, KeyError, TypeError) as e:
# Critical configuration errors should fail fast in alpha
logger.error(f"Invalid crawl settings format: {e}", exc_info=True)
raise ValueError(f"Failed to load crawler configuration: {e}")
except Exception as e:
logger.warning(f"Failed to load crawl settings: {e}, using defaults")
# For non-critical errors (e.g., network issues), use defaults but log prominently
logger.error(f"Failed to load crawl settings from database: {e}, using defaults", exc_info=True)
batch_size = 50
if max_concurrent is None:
max_concurrent = 10
max_concurrent = 10 # Safe default to prevent memory issues
memory_threshold = 80.0
check_interval = 0.5
settings = {} # Empty dict for defaults