diff --git a/.env.example b/.env.example index f2cf73e..981f1da 100644 --- a/.env.example +++ b/.env.example @@ -33,4 +33,9 @@ EMBEDDING_DIMENSIONS=1536 # - OPENAI_API_KEY (encrypted) # - MODEL_CHOICE # - TRANSPORT settings -# - RAG strategy flags (USE_CONTEXTUAL_EMBEDDINGS, USE_HYBRID_SEARCH, etc.) \ No newline at end of file +# - RAG strategy flags (USE_CONTEXTUAL_EMBEDDINGS, USE_HYBRID_SEARCH, etc.) +# - Crawler settings: +# * CRAWL_MAX_CONCURRENT (default: 10) - Max concurrent pages per crawl operation +# * CRAWL_BATCH_SIZE (default: 50) - URLs processed per batch +# * MEMORY_THRESHOLD_PERCENT (default: 80) - Memory % before throttling +# * DISPATCHER_CHECK_INTERVAL (default: 0.5) - Memory check interval in seconds \ No newline at end of file diff --git a/python/src/server/services/crawling/crawling_service.py b/python/src/server/services/crawling/crawling_service.py index b450375..5b5d430 100644 --- a/python/src/server/services/crawling/crawling_service.py +++ b/python/src/server/services/crawling/crawling_service.py @@ -19,14 +19,20 @@ from ...utils import get_supabase_client update_crawl_progress = None complete_crawl_progress = None + def _ensure_socketio_imports(): """Ensure socket.IO handlers are imported.""" global update_crawl_progress, complete_crawl_progress if update_crawl_progress is None: - from ...api_routes.socketio_handlers import update_crawl_progress as _update, complete_crawl_progress as _complete + from ...api_routes.socketio_handlers import ( + update_crawl_progress as _update, + complete_crawl_progress as _complete, + ) + update_crawl_progress = _update complete_crawl_progress = _complete + # Import strategies from .strategies.batch import BatchCrawlStrategy from .strategies.recursive import RecursiveCrawlStrategy @@ -44,15 +50,15 @@ from .progress_mapper import ProgressMapper logger = get_logger(__name__) # Global registry to track active orchestration services for cancellation support -_active_orchestrations: Dict[str, 'CrawlingService'] = {} +_active_orchestrations: Dict[str, "CrawlingService"] = {} -def get_active_orchestration(progress_id: str) -> Optional['CrawlingService']: +def get_active_orchestration(progress_id: str) -> Optional["CrawlingService"]: """Get an active orchestration service by progress ID.""" return _active_orchestrations.get(progress_id) -def register_orchestration(progress_id: str, orchestration: 'CrawlingService'): +def register_orchestration(progress_id: str, orchestration: "CrawlingService"): """Register an active orchestration service.""" _active_orchestrations[progress_id] = orchestration @@ -68,11 +74,11 @@ class CrawlingService: Service class for web crawling and orchestration operations. Combines functionality from both CrawlingService and CrawlOrchestrationService. """ - + def __init__(self, crawler=None, supabase_client=None, progress_id=None): """ Initialize the crawling service. - + Args: crawler: The Crawl4AI crawler instance supabase_client: The Supabase client for database operations @@ -81,92 +87,97 @@ class CrawlingService: self.crawler = crawler self.supabase_client = supabase_client or get_supabase_client() self.progress_id = progress_id - + # Initialize helpers self.url_handler = URLHandler() self.site_config = SiteConfig() self.markdown_generator = self.site_config.get_markdown_generator() - + # Initialize strategies self.batch_strategy = BatchCrawlStrategy(crawler, self.markdown_generator) self.recursive_strategy = RecursiveCrawlStrategy(crawler, self.markdown_generator) self.single_page_strategy = SinglePageCrawlStrategy(crawler, self.markdown_generator) self.sitemap_strategy = SitemapCrawlStrategy() - + # Initialize operations self.doc_storage_ops = DocumentStorageOperations(self.supabase_client) - + # Track progress state across all stages to prevent UI resets - self.progress_state = {'progressId': self.progress_id} if self.progress_id else {} + self.progress_state = {"progressId": self.progress_id} if self.progress_id else {} # Initialize progress mapper to prevent backwards jumps self.progress_mapper = ProgressMapper() # Cancellation support self._cancelled = False - + def set_progress_id(self, progress_id: str): """Set the progress ID for Socket.IO updates.""" self.progress_id = progress_id if self.progress_id: - self.progress_state = {'progressId': self.progress_id} - + self.progress_state = {"progressId": self.progress_id} + def cancel(self): """Cancel the crawl operation.""" self._cancelled = True safe_logfire_info(f"Crawl operation cancelled | progress_id={self.progress_id}") - + def is_cancelled(self) -> bool: """Check if the crawl operation has been cancelled.""" return self._cancelled - + def _check_cancellation(self): """Check if cancelled and raise an exception if so.""" if self._cancelled: raise asyncio.CancelledError("Crawl operation was cancelled by user") - - async def _create_crawl_progress_callback(self, base_status: str) -> Callable[[str, int, str], Awaitable[None]]: + + async def _create_crawl_progress_callback( + self, base_status: str + ) -> Callable[[str, int, str], Awaitable[None]]: """Create a progress callback for crawling operations. - + Args: base_status: The base status to use for progress updates - + Returns: Async callback function with signature (status: str, percentage: int, message: str, **kwargs) -> None """ _ensure_socketio_imports() - + async def callback(status: str, percentage: int, message: str, **kwargs): if self.progress_id: # Update and preserve progress state self.progress_state.update({ - 'status': base_status, - 'percentage': percentage, - 'log': message, - **kwargs + "status": base_status, + "percentage": percentage, + "log": message, + **kwargs, }) - safe_logfire_info(f"Emitting crawl progress | progress_id={self.progress_id} | status={base_status} | percentage={percentage}") + safe_logfire_info( + f"Emitting crawl progress | progress_id={self.progress_id} | status={base_status} | percentage={percentage}" + ) await update_crawl_progress(self.progress_id, self.progress_state) + return callback - + async def _handle_progress_update(self, task_id: str, update: Dict[str, Any]) -> None: """ Handle progress updates from background task. - + Args: task_id: The task ID for the progress update update: Dictionary containing progress update data """ _ensure_socketio_imports() - + if self.progress_id: # Update and preserve progress state self.progress_state.update(update) # Ensure progressId is always included - if self.progress_id and 'progressId' not in self.progress_state: - self.progress_state['progressId'] = self.progress_id - + if self.progress_id and "progressId" not in self.progress_state: + self.progress_state["progressId"] = self.progress_id + # Always emit progress updates for real-time feedback await update_crawl_progress(self.progress_id, self.progress_state) - + # Simple delegation methods for backward compatibility async def crawl_single_page(self, url: str, retry_count: int = 3) -> Dict[str, Any]: """Crawl a single web page.""" @@ -174,27 +185,33 @@ class CrawlingService: url, self.url_handler.transform_github_url, self.site_config.is_documentation_site, - retry_count + retry_count, ) - - async def crawl_markdown_file(self, url: str, progress_callback=None, - start_progress: int = 10, end_progress: int = 20) -> List[Dict[str, Any]]: + + async def crawl_markdown_file( + self, url: str, progress_callback=None, start_progress: int = 10, end_progress: int = 20 + ) -> List[Dict[str, Any]]: """Crawl a .txt or markdown file.""" return await self.single_page_strategy.crawl_markdown_file( url, self.url_handler.transform_github_url, progress_callback, start_progress, - end_progress + end_progress, ) - + def parse_sitemap(self, sitemap_url: str) -> List[str]: """Parse a sitemap and extract URLs.""" return self.sitemap_strategy.parse_sitemap(sitemap_url) - - async def crawl_batch_with_progress(self, urls: List[str], max_concurrent: int = None, - progress_callback=None, start_progress: int = 15, - end_progress: int = 60) -> List[Dict[str, Any]]: + + async def crawl_batch_with_progress( + self, + urls: List[str], + max_concurrent: int = None, + progress_callback=None, + start_progress: int = 15, + end_progress: int = 60, + ) -> List[Dict[str, Any]]: """Batch crawl multiple URLs in parallel.""" return await self.batch_strategy.crawl_batch_with_progress( urls, @@ -203,12 +220,18 @@ class CrawlingService: max_concurrent, progress_callback, start_progress, - end_progress + end_progress, ) - - async def crawl_recursive_with_progress(self, start_urls: List[str], max_depth: int = 3, - max_concurrent: int = None, progress_callback=None, - start_progress: int = 10, end_progress: int = 60) -> List[Dict[str, Any]]: + + async def crawl_recursive_with_progress( + self, + start_urls: List[str], + max_depth: int = 3, + max_concurrent: int = None, + progress_callback=None, + start_progress: int = 10, + end_progress: int = 60, + ) -> List[Dict[str, Any]]: """Recursively crawl internal links from start URLs.""" return await self.recursive_strategy.crawl_recursive_with_progress( start_urls, @@ -218,150 +241,164 @@ class CrawlingService: max_concurrent, progress_callback, start_progress, - end_progress + end_progress, ) - + # Orchestration methods async def orchestrate_crawl(self, request: Dict[str, Any]) -> Dict[str, Any]: """ Main orchestration method - non-blocking using asyncio.create_task. - + Args: request: The crawl request containing url, knowledge_type, tags, max_depth, etc. - + Returns: Dict containing task_id and status """ - url = str(request.get('url', '')) + url = str(request.get("url", "")) safe_logfire_info(f"Starting background crawl orchestration | url={url}") - + # Create task ID task_id = self.progress_id or str(uuid.uuid4()) - + # Register this orchestration service for cancellation support if self.progress_id: register_orchestration(self.progress_id, self) - + # Start the crawl as an async task in the main event loop asyncio.create_task(self._async_orchestrate_crawl(request, task_id)) - + # Return immediately return { "task_id": task_id, "status": "started", "message": f"Crawl operation started for {url}", - "progress_id": self.progress_id + "progress_id": self.progress_id, } - + async def _async_orchestrate_crawl(self, request: Dict[str, Any], task_id: str): """ Async orchestration that runs in the main event loop. """ last_heartbeat = asyncio.get_event_loop().time() heartbeat_interval = 30.0 # Send heartbeat every 30 seconds - + async def send_heartbeat_if_needed(): """Send heartbeat to keep Socket.IO connection alive""" nonlocal last_heartbeat current_time = asyncio.get_event_loop().time() if current_time - last_heartbeat >= heartbeat_interval: - await self._handle_progress_update(task_id, { - 'status': self.progress_mapper.get_current_stage(), - 'percentage': self.progress_mapper.get_current_progress(), - 'heartbeat': True, - 'log': 'Background task still running...', - 'message': 'Processing...' - }) + await self._handle_progress_update( + task_id, + { + "status": self.progress_mapper.get_current_stage(), + "percentage": self.progress_mapper.get_current_progress(), + "heartbeat": True, + "log": "Background task still running...", + "message": "Processing...", + }, + ) last_heartbeat = current_time - + try: - url = str(request.get('url', '')) + url = str(request.get("url", "")) safe_logfire_info(f"Starting async crawl orchestration | url={url} | task_id={task_id}") - + # Extract source_id from the original URL parsed_original_url = urlparse(url) original_source_id = parsed_original_url.netloc or parsed_original_url.path safe_logfire_info(f"Using source_id '{original_source_id}' from original URL '{url}'") - + # Helper to update progress with mapper - async def update_mapped_progress(stage: str, stage_progress: int, message: str, **kwargs): + async def update_mapped_progress( + stage: str, stage_progress: int, message: str, **kwargs + ): overall_progress = self.progress_mapper.map_progress(stage, stage_progress) - await self._handle_progress_update(task_id, { - 'status': stage, - 'percentage': overall_progress, - 'log': message, - 'message': message, - **kwargs - }) - + await self._handle_progress_update( + task_id, + { + "status": stage, + "percentage": overall_progress, + "log": message, + "message": message, + **kwargs, + }, + ) + # Initial progress - await update_mapped_progress('starting', 100, f'Starting crawl of {url}', currentUrl=url) - + await update_mapped_progress( + "starting", 100, f"Starting crawl of {url}", currentUrl=url + ) + # Check for cancellation before proceeding self._check_cancellation() - + # Analyzing stage - await update_mapped_progress('analyzing', 50, f'Analyzing URL type for {url}') - + await update_mapped_progress("analyzing", 50, f"Analyzing URL type for {url}") + # Detect URL type and perform crawl crawl_results, crawl_type = await self._crawl_by_url_type(url, request) - + # Check for cancellation after crawling self._check_cancellation() - + # Send heartbeat after potentially long crawl operation await send_heartbeat_if_needed() - + if not crawl_results: raise ValueError("No content was crawled from the provided URL") - + # Processing stage - await update_mapped_progress('processing', 50, 'Processing crawled content') - + await update_mapped_progress("processing", 50, "Processing crawled content") + # Check for cancellation before document processing self._check_cancellation() - + # Process and store documents using document storage operations - async def doc_storage_callback(message: str, percentage: int, batch_info: Optional[dict] = None): + async def doc_storage_callback( + message: str, percentage: int, batch_info: Optional[dict] = None + ): if self.progress_id: _ensure_socketio_imports() # Map percentage to document storage range (20-85%) mapped_percentage = 20 + int((percentage / 100) * (85 - 20)) - safe_logfire_info(f"Document storage progress mapping: {percentage}% -> {mapped_percentage}%") - + safe_logfire_info( + f"Document storage progress mapping: {percentage}% -> {mapped_percentage}%" + ) + # Update progress state while preserving existing fields self.progress_state.update({ - 'status': 'document_storage', - 'percentage': mapped_percentage, - 'log': message + "status": "document_storage", + "percentage": mapped_percentage, + "log": message, }) - + # Add batch_info fields if provided if batch_info: self.progress_state.update(batch_info) - + await update_crawl_progress(self.progress_id, self.progress_state) - + storage_results = await self.doc_storage_ops.process_and_store_documents( crawl_results, request, crawl_type, original_source_id, doc_storage_callback, - self._check_cancellation + self._check_cancellation, ) - + # Check for cancellation after document storage self._check_cancellation() - + # Send heartbeat after document storage await send_heartbeat_if_needed() - + # Extract code examples if requested code_examples_count = 0 - if request.get('extract_code_examples', True): - await update_mapped_progress('code_extraction', 0, 'Starting code extraction...') - + if request.get("extract_code_examples", True): + await update_mapped_progress("code_extraction", 0, "Starting code extraction...") + # Create progress callback for code extraction async def code_progress_callback(data: dict): if self.progress_id: @@ -369,156 +406,169 @@ class CrawlingService: # Update progress state while preserving existing fields self.progress_state.update(data) await update_crawl_progress(self.progress_id, self.progress_state) - + code_examples_count = await self.doc_storage_ops.extract_and_store_code_examples( crawl_results, - storage_results['url_to_full_document'], + storage_results["url_to_full_document"], code_progress_callback, 85, - 95 + 95, ) - + # Send heartbeat after code extraction await send_heartbeat_if_needed() - + # Finalization await update_mapped_progress( - 'finalization', 50, 'Finalizing crawl results...', - chunks_stored=storage_results['chunk_count'], - code_examples_found=code_examples_count + "finalization", + 50, + "Finalizing crawl results...", + chunks_stored=storage_results["chunk_count"], + code_examples_found=code_examples_count, ) - + # Complete - send both the progress update and completion event await update_mapped_progress( - 'completed', 100, - f'Crawl completed: {storage_results["chunk_count"]} chunks, {code_examples_count} code examples', - chunks_stored=storage_results['chunk_count'], + "completed", + 100, + f"Crawl completed: {storage_results['chunk_count']} chunks, {code_examples_count} code examples", + chunks_stored=storage_results["chunk_count"], code_examples_found=code_examples_count, processed_pages=len(crawl_results), - total_pages=len(crawl_results) + total_pages=len(crawl_results), ) - + # Also send the completion event that frontend expects _ensure_socketio_imports() - await complete_crawl_progress(task_id, { - 'chunks_stored': storage_results['chunk_count'], - 'code_examples_found': code_examples_count, - 'processed_pages': len(crawl_results), - 'total_pages': len(crawl_results), - 'sourceId': storage_results.get('source_id', ''), - 'log': 'Crawl completed successfully!' - }) - + await complete_crawl_progress( + task_id, + { + "chunks_stored": storage_results["chunk_count"], + "code_examples_found": code_examples_count, + "processed_pages": len(crawl_results), + "total_pages": len(crawl_results), + "sourceId": storage_results.get("source_id", ""), + "log": "Crawl completed successfully!", + }, + ) + # Unregister after successful completion if self.progress_id: unregister_orchestration(self.progress_id) - safe_logfire_info(f"Unregistered orchestration service after completion | progress_id={self.progress_id}") - + safe_logfire_info( + f"Unregistered orchestration service after completion | progress_id={self.progress_id}" + ) + except asyncio.CancelledError: safe_logfire_info(f"Crawl operation cancelled | progress_id={self.progress_id}") - await self._handle_progress_update(task_id, { - 'status': 'cancelled', - 'percentage': -1, - 'log': 'Crawl operation was cancelled by user' - }) + await self._handle_progress_update( + task_id, + { + "status": "cancelled", + "percentage": -1, + "log": "Crawl operation was cancelled by user", + }, + ) # Unregister on cancellation if self.progress_id: unregister_orchestration(self.progress_id) - safe_logfire_info(f"Unregistered orchestration service on cancellation | progress_id={self.progress_id}") + safe_logfire_info( + f"Unregistered orchestration service on cancellation | progress_id={self.progress_id}" + ) except Exception as e: safe_logfire_error(f"Async crawl orchestration failed | error={str(e)}") - await self._handle_progress_update(task_id, { - 'status': 'error', - 'percentage': -1, - 'log': f'Crawl failed: {str(e)}' - }) + await self._handle_progress_update( + task_id, {"status": "error", "percentage": -1, "log": f"Crawl failed: {str(e)}"} + ) # Unregister on error if self.progress_id: unregister_orchestration(self.progress_id) - safe_logfire_info(f"Unregistered orchestration service on error | progress_id={self.progress_id}") - + safe_logfire_info( + f"Unregistered orchestration service on error | progress_id={self.progress_id}" + ) + async def _crawl_by_url_type(self, url: str, request: Dict[str, Any]) -> tuple: """ Detect URL type and perform appropriate crawling. - + Returns: Tuple of (crawl_results, crawl_type) """ _ensure_socketio_imports() - + crawl_results = [] crawl_type = None - + if self.url_handler.is_txt(url): # Handle text files if self.progress_id: self.progress_state.update({ - 'status': 'crawling', - 'percentage': 10, - 'log': 'Detected text file, fetching content...' + "status": "crawling", + "percentage": 10, + "log": "Detected text file, fetching content...", }) await update_crawl_progress(self.progress_id, self.progress_state) crawl_results = await self.crawl_markdown_file( url, - progress_callback=await self._create_crawl_progress_callback('crawling'), + progress_callback=await self._create_crawl_progress_callback("crawling"), start_progress=10, - end_progress=20 + end_progress=20, ) crawl_type = "text_file" - + elif self.url_handler.is_sitemap(url): # Handle sitemaps if self.progress_id: self.progress_state.update({ - 'status': 'crawling', - 'percentage': 10, - 'log': 'Detected sitemap, parsing URLs...' + "status": "crawling", + "percentage": 10, + "log": "Detected sitemap, parsing URLs...", }) await update_crawl_progress(self.progress_id, self.progress_state) sitemap_urls = self.parse_sitemap(url) - + if sitemap_urls: # Emit progress before starting batch crawl if self.progress_id: self.progress_state.update({ - 'status': 'crawling', - 'percentage': 15, - 'log': f'Starting batch crawl of {len(sitemap_urls)} URLs...' + "status": "crawling", + "percentage": 15, + "log": f"Starting batch crawl of {len(sitemap_urls)} URLs...", }) await update_crawl_progress(self.progress_id, self.progress_state) - + crawl_results = await self.crawl_batch_with_progress( sitemap_urls, - progress_callback=await self._create_crawl_progress_callback('crawling'), + progress_callback=await self._create_crawl_progress_callback("crawling"), start_progress=15, - end_progress=20 + end_progress=20, ) crawl_type = "sitemap" - + else: # Handle regular webpages with recursive crawling if self.progress_id: self.progress_state.update({ - 'status': 'crawling', - 'percentage': 10, - 'log': f'Starting recursive crawl with max depth {request.get("max_depth", 1)}...' + "status": "crawling", + "percentage": 10, + "log": f"Starting recursive crawl with max depth {request.get('max_depth', 1)}...", }) await update_crawl_progress(self.progress_id, self.progress_state) - - max_depth = request.get('max_depth', 1) - # Limit concurrent crawls for better performance - max_concurrent = 20 if self.site_config.is_documentation_site(url) else 10 + + max_depth = request.get("max_depth", 1) + # Let the strategy handle concurrency from settings + # This will use CRAWL_MAX_CONCURRENT from database (default: 10) crawl_results = await self.crawl_recursive_with_progress( [url], max_depth=max_depth, - max_concurrent=max_concurrent, - progress_callback=await self._create_crawl_progress_callback('crawling'), + max_concurrent=None, # Let strategy use settings + progress_callback=await self._create_crawl_progress_callback("crawling"), start_progress=10, - end_progress=20 + end_progress=20, ) crawl_type = "webpage" - + return crawl_results, crawl_type diff --git a/python/src/server/services/crawling/strategies/batch.py b/python/src/server/services/crawling/strategies/batch.py index 2dc18fd..d97b0bc 100644 --- a/python/src/server/services/crawling/strategies/batch.py +++ b/python/src/server/services/crawling/strategies/batch.py @@ -3,6 +3,7 @@ Batch Crawling Strategy Handles batch crawling of multiple URLs in parallel. """ + import asyncio from typing import List, Dict, Any, Optional, Callable @@ -15,18 +16,18 @@ logger = get_logger(__name__) class BatchCrawlStrategy: """Strategy for crawling multiple URLs in batch.""" - + def __init__(self, crawler, markdown_generator): """ Initialize batch crawl strategy. - + Args: crawler (AsyncWebCrawler): The Crawl4AI crawler instance for web crawling operations markdown_generator (DefaultMarkdownGenerator): The markdown generator instance for converting HTML to markdown """ self.crawler = crawler self.markdown_generator = markdown_generator - + async def crawl_batch_with_progress( self, urls: List[str], @@ -35,11 +36,11 @@ class BatchCrawlStrategy: max_concurrent: int = None, progress_callback: Optional[Callable] = None, start_progress: int = 15, - end_progress: int = 60 + end_progress: int = 60, ) -> List[Dict[str, Any]]: """ Batch crawl multiple URLs in parallel with progress reporting. - + Args: urls: List of URLs to crawl transform_url_func: Function to transform URLs (e.g., GitHub URLs) @@ -48,17 +49,17 @@ class BatchCrawlStrategy: progress_callback: Optional callback for progress updates start_progress: Starting progress percentage end_progress: Ending progress percentage - + Returns: List of crawl results """ if not self.crawler: logger.error("No crawler instance available for batch crawling") if progress_callback: - await progress_callback('error', 0, 'Crawler not available') + await progress_callback("error", 0, "Crawler not available") return [] - - # Load settings from database first + + # Load settings from database - fail fast on configuration errors try: settings = await credential_service.get_credentials_by_category("rag_strategy") batch_size = int(settings.get("CRAWL_BATCH_SIZE", "50")) @@ -66,18 +67,23 @@ class BatchCrawlStrategy: max_concurrent = int(settings.get("CRAWL_MAX_CONCURRENT", "10")) memory_threshold = float(settings.get("MEMORY_THRESHOLD_PERCENT", "80")) check_interval = float(settings.get("DISPATCHER_CHECK_INTERVAL", "0.5")) + except (ValueError, KeyError, TypeError) as e: + # Critical configuration errors should fail fast in alpha + logger.error(f"Invalid crawl settings format: {e}", exc_info=True) + raise ValueError(f"Failed to load crawler configuration: {e}") except Exception as e: - logger.warning(f"Failed to load crawl settings: {e}, using defaults") + # For non-critical errors (e.g., network issues), use defaults but log prominently + logger.error(f"Failed to load crawl settings from database: {e}, using defaults", exc_info=True) batch_size = 50 if max_concurrent is None: - max_concurrent = 10 + max_concurrent = 10 # Safe default to prevent memory issues memory_threshold = 80.0 check_interval = 0.5 settings = {} # Empty dict for defaults - + # Check if any URLs are documentation sites has_doc_sites = any(is_documentation_site_func(url) for url in urls) - + if has_doc_sites: logger.info("Detected documentation sites in batch, using enhanced configuration") # Use generic documentation selectors for batch crawling @@ -85,7 +91,7 @@ class BatchCrawlStrategy: cache_mode=CacheMode.BYPASS, stream=True, # Enable streaming for faster parallel processing markdown_generator=self.markdown_generator, - wait_for='body', # Simple selector for batch + wait_for="body", # Simple selector for batch wait_until=settings.get("CRAWL_WAIT_STRATEGY", "domcontentloaded"), page_timeout=int(settings.get("CRAWL_PAGE_TIMEOUT", "30000")), delay_before_return_html=float(settings.get("CRAWL_DELAY_BEFORE_HTML", "1.0")), @@ -93,7 +99,7 @@ class BatchCrawlStrategy: scan_full_page=True, # Trigger lazy loading exclude_all_images=False, remove_overlay_elements=True, - process_iframes=True + process_iframes=True, ) else: # Configuration for regular batch crawling @@ -104,27 +110,27 @@ class BatchCrawlStrategy: wait_until=settings.get("CRAWL_WAIT_STRATEGY", "domcontentloaded"), page_timeout=int(settings.get("CRAWL_PAGE_TIMEOUT", "45000")), delay_before_return_html=float(settings.get("CRAWL_DELAY_BEFORE_HTML", "0.5")), - scan_full_page=True + scan_full_page=True, ) - + dispatcher = MemoryAdaptiveDispatcher( memory_threshold_percent=memory_threshold, check_interval=check_interval, - max_session_permit=max_concurrent + max_session_permit=max_concurrent, ) - + async def report_progress(percentage: int, message: str): """Helper to report progress if callback is available""" if progress_callback: - await progress_callback('crawling', percentage, message) - + await progress_callback("crawling", percentage, message) + total_urls = len(urls) - await report_progress(start_progress, f'Starting to crawl {total_urls} URLs...') - + await report_progress(start_progress, f"Starting to crawl {total_urls} URLs...") + # Use configured batch size successful_results = [] processed = 0 - + # Transform all URLs at the beginning url_mapping = {} # Map transformed URLs back to original transformed_urls = [] @@ -132,20 +138,29 @@ class BatchCrawlStrategy: transformed = transform_url_func(url) transformed_urls.append(transformed) url_mapping[transformed] = url - + for i in range(0, total_urls, batch_size): - batch_urls = transformed_urls[i:i + batch_size] + batch_urls = transformed_urls[i : i + batch_size] batch_start = i batch_end = min(i + batch_size, total_urls) - + # Report batch start with smooth progress - progress_percentage = start_progress + int((i / total_urls) * (end_progress - start_progress)) - await report_progress(progress_percentage, f'Processing batch {batch_start+1}-{batch_end} of {total_urls} URLs...') - + progress_percentage = start_progress + int( + (i / total_urls) * (end_progress - start_progress) + ) + await report_progress( + progress_percentage, + f"Processing batch {batch_start + 1}-{batch_end} of {total_urls} URLs...", + ) + # Crawl this batch using arun_many with streaming - logger.info(f"Starting parallel crawl of batch {batch_start+1}-{batch_end} ({len(batch_urls)} URLs)") - batch_results = await self.crawler.arun_many(urls=batch_urls, config=crawl_config, dispatcher=dispatcher) - + logger.info( + f"Starting parallel crawl of batch {batch_start + 1}-{batch_end} ({len(batch_urls)} URLs)" + ) + batch_results = await self.crawler.arun_many( + urls=batch_urls, config=crawl_config, dispatcher=dispatcher + ) + # Handle streaming results j = 0 async for result in batch_results: @@ -154,19 +169,31 @@ class BatchCrawlStrategy: # Map back to original URL original_url = url_mapping.get(result.url, result.url) successful_results.append({ - 'url': original_url, - 'markdown': result.markdown, - 'html': result.html # Use raw HTML + "url": original_url, + "markdown": result.markdown, + "html": result.html, # Use raw HTML }) else: - logger.warning(f"Failed to crawl {result.url}: {getattr(result, 'error_message', 'Unknown error')}") - + logger.warning( + f"Failed to crawl {result.url}: {getattr(result, 'error_message', 'Unknown error')}" + ) + # Report individual URL progress with smooth increments - progress_percentage = start_progress + int((processed / total_urls) * (end_progress - start_progress)) + progress_percentage = start_progress + int( + (processed / total_urls) * (end_progress - start_progress) + ) # Report more frequently for smoother progress - if processed % 5 == 0 or processed == total_urls: # Report every 5 URLs or at the end - await report_progress(progress_percentage, f'Crawled {processed}/{total_urls} pages ({len(successful_results)} successful)') + if ( + processed % 5 == 0 or processed == total_urls + ): # Report every 5 URLs or at the end + await report_progress( + progress_percentage, + f"Crawled {processed}/{total_urls} pages ({len(successful_results)} successful)", + ) j += 1 - - await report_progress(end_progress, f'Batch crawling completed: {len(successful_results)}/{total_urls} pages successful') - return successful_results \ No newline at end of file + + await report_progress( + end_progress, + f"Batch crawling completed: {len(successful_results)}/{total_urls} pages successful", + ) + return successful_results diff --git a/python/src/server/services/crawling/strategies/recursive.py b/python/src/server/services/crawling/strategies/recursive.py index 675c97f..8b9cf93 100644 --- a/python/src/server/services/crawling/strategies/recursive.py +++ b/python/src/server/services/crawling/strategies/recursive.py @@ -61,7 +61,7 @@ class RecursiveCrawlStrategy: await progress_callback('error', 0, 'Crawler not available') return [] - # Load settings from database + # Load settings from database - fail fast on configuration errors try: settings = await credential_service.get_credentials_by_category("rag_strategy") batch_size = int(settings.get("CRAWL_BATCH_SIZE", "50")) @@ -69,11 +69,16 @@ class RecursiveCrawlStrategy: max_concurrent = int(settings.get("CRAWL_MAX_CONCURRENT", "10")) memory_threshold = float(settings.get("MEMORY_THRESHOLD_PERCENT", "80")) check_interval = float(settings.get("DISPATCHER_CHECK_INTERVAL", "0.5")) + except (ValueError, KeyError, TypeError) as e: + # Critical configuration errors should fail fast in alpha + logger.error(f"Invalid crawl settings format: {e}", exc_info=True) + raise ValueError(f"Failed to load crawler configuration: {e}") except Exception as e: - logger.warning(f"Failed to load crawl settings: {e}, using defaults") + # For non-critical errors (e.g., network issues), use defaults but log prominently + logger.error(f"Failed to load crawl settings from database: {e}, using defaults", exc_info=True) batch_size = 50 if max_concurrent is None: - max_concurrent = 10 + max_concurrent = 10 # Safe default to prevent memory issues memory_threshold = 80.0 check_interval = 0.5 settings = {} # Empty dict for defaults