* Fix race condition in concurrent crawling with unique source IDs - Add unique hash-based source_id generation to prevent conflicts - Separate source identification from display with three fields: - source_id: 16-char SHA256 hash for unique identification - source_url: Original URL for tracking - source_display_name: Human-friendly name for UI - Add comprehensive test suite validating the fix - Migrate existing data with backward compatibility * Fix title generation to use source_display_name for better AI context - Pass source_display_name to title generation function - Use display name in AI prompt instead of hash-based source_id - Results in more specific, meaningful titles for each source * Skip AI title generation when display name is available - Use source_display_name directly as title to avoid unnecessary AI calls - More efficient and predictable than AI-generated titles - Keep AI generation only as fallback for backward compatibility * Fix critical issues from code review - Add missing os import to prevent NameError crash - Remove unused imports (pytest, Mock, patch, hashlib, urlparse, etc.) - Fix GitHub API capitalization consistency - Reuse existing DocumentStorageService instance - Update test expectations to match corrected capitalization Addresses CodeRabbit review feedback on PR #472 * Add safety improvements from code review - Truncate display names to 100 chars when used as titles - Document hash collision probability (negligible for <1M sources) Simple, pragmatic fixes per KISS principle * Fix code extraction to use hash-based source_ids and improve display names - Fixed critical bug where code extraction was using old domain-based source_ids - Updated code extraction service to accept source_id as parameter instead of extracting from URL - Added special handling for llms.txt and sitemap.xml files in display names - Added comprehensive tests for source_id handling in code extraction - Removed unused urlparse import from code_extraction_service.py This fixes the foreign key constraint errors that were preventing code examples from being stored after the source_id architecture refactor. Co-Authored-By: Claude <noreply@anthropic.com> * Fix critical variable shadowing and source_type determination issues - Fixed variable shadowing in document_storage_operations.py where source_url parameter was being overwritten by document URLs, causing incorrect source_url in database - Fixed source_type determination to use actual URLs instead of hash-based source_id - Added comprehensive tests for source URL preservation - Ensure source_type is correctly set to "file" for file uploads, "url" for web crawls The variable shadowing bug was causing sitemap sources to have the wrong source_url (last crawled page instead of sitemap URL). The source_type bug would mark all sources as "url" even for file uploads due to hash-based IDs not starting with "file_". Co-Authored-By: Claude <noreply@anthropic.com> * Fix URL canonicalization and document metrics calculation - Implement proper URL canonicalization to prevent duplicate sources - Remove trailing slashes (except root) - Remove URL fragments - Remove tracking parameters (utm_*, gclid, fbclid, etc.) - Sort query parameters for consistency - Remove default ports (80 for HTTP, 443 for HTTPS) - Normalize scheme and domain to lowercase - Fix avg_chunks_per_doc calculation to avoid division by zero - Track processed_docs count separately from total crawl_results - Handle all-empty document sets gracefully - Show processed/total in logs for better visibility - Add comprehensive tests for both fixes - 10 test cases for URL canonicalization edge cases - 4 test cases for document metrics calculation This prevents database constraint violations when crawling the same content with URL variations and provides accurate metrics in logs. * Fix synchronous extract_source_summary blocking async event loop - Run extract_source_summary in thread pool using asyncio.to_thread - Prevents blocking the async event loop during AI summary generation - Preserves exact error handling and fallback behavior - Variables (source_id, combined_content) properly passed to thread Added comprehensive tests verifying: - Function runs in thread without blocking - Error handling works correctly with fallback - Multiple sources can be processed - Thread safety with variable passing * Fix synchronous update_source_info blocking async event loop - Run update_source_info in thread pool using asyncio.to_thread - Prevents blocking the async event loop during database operations - Preserves exact error handling and fallback behavior - All kwargs properly passed to thread execution Added comprehensive tests verifying: - Function runs in thread without blocking - Error handling triggers fallback correctly - All kwargs are preserved when passed to thread - Existing extract_source_summary tests still pass * Fix race condition in source creation using upsert - Replace INSERT with UPSERT for new sources to prevent PRIMARY KEY violations - Handles concurrent crawls attempting to create the same source - Maintains existing UPDATE behavior for sources that already exist Added comprehensive tests verifying: - Concurrent source creation doesn't fail - Upsert is used for new sources (not insert) - Update is still used for existing sources - Async concurrent operations work correctly - Race conditions with delays are handled This prevents database constraint errors when multiple crawls target the same URL simultaneously. * Add migration detection UI components Add MigrationBanner component with clear user instructions for database schema updates. Add useMigrationStatus hook for periodic health check monitoring with graceful error handling. * Integrate migration banner into main app Add migration status monitoring and banner display to App.tsx. Shows migration banner when database schema updates are required. * Enhance backend startup error instructions Add detailed Docker restart instructions and migration script guidance. Improves user experience when encountering startup failures. * Add database schema caching to health endpoint Implement smart caching for schema validation to prevent repeated database queries. Cache successful validations permanently and throttle failures to 30-second intervals. Replace debug prints with proper logging. * Clean up knowledge API imports and logging Remove duplicate import statements and redundant logging. Improves code clarity and reduces log noise. * Remove unused instructions prop from MigrationBanner Clean up component API by removing instructions prop that was accepted but never rendered. Simplifies the interface and eliminates dead code while keeping the functional hardcoded migration steps. * Add schema_valid flag to migration_required health response Add schema_valid: false flag to health endpoint response when database schema migration is required. Improves API consistency without changing existing behavior. --------- Co-authored-by: Claude <noreply@anthropic.com>
413 lines
20 KiB
Python
413 lines
20 KiB
Python
"""
|
|
Test async execution of extract_source_summary and update_source_info.
|
|
|
|
This test ensures that synchronous functions extract_source_summary and
|
|
update_source_info are properly executed in thread pools to avoid blocking
|
|
the async event loop.
|
|
"""
|
|
|
|
import asyncio
|
|
import time
|
|
from unittest.mock import Mock, AsyncMock, patch
|
|
import pytest
|
|
|
|
from src.server.services.crawling.document_storage_operations import DocumentStorageOperations
|
|
|
|
|
|
class TestAsyncSourceSummary:
|
|
"""Test that extract_source_summary and update_source_info don't block the async event loop."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_extract_summary_runs_in_thread(self):
|
|
"""Test that extract_source_summary is executed in a thread pool."""
|
|
# Create mock supabase client
|
|
mock_supabase = Mock()
|
|
mock_supabase.table.return_value.upsert.return_value.execute.return_value = Mock()
|
|
|
|
doc_storage = DocumentStorageOperations(mock_supabase)
|
|
|
|
# Track when extract_source_summary is called
|
|
summary_call_times = []
|
|
original_summary_result = "Test summary from AI"
|
|
|
|
def slow_extract_summary(source_id, content):
|
|
"""Simulate a slow synchronous function that would block the event loop."""
|
|
summary_call_times.append(time.time())
|
|
# Simulate a blocking operation (like an API call)
|
|
time.sleep(0.1) # This would block the event loop if not run in thread
|
|
return original_summary_result
|
|
|
|
# Mock the storage service
|
|
doc_storage.doc_storage_service.smart_chunk_text = Mock(
|
|
return_value=["chunk1", "chunk2"]
|
|
)
|
|
|
|
with patch('src.server.services.crawling.document_storage_operations.extract_source_summary',
|
|
side_effect=slow_extract_summary):
|
|
with patch('src.server.services.crawling.document_storage_operations.update_source_info'):
|
|
with patch('src.server.services.crawling.document_storage_operations.safe_logfire_info'):
|
|
with patch('src.server.services.crawling.document_storage_operations.safe_logfire_error'):
|
|
# Create test metadata
|
|
all_metadatas = [
|
|
{"source_id": "test123", "word_count": 100},
|
|
{"source_id": "test123", "word_count": 150},
|
|
]
|
|
all_contents = ["chunk1", "chunk2"]
|
|
source_word_counts = {"test123": 250}
|
|
request = {"knowledge_type": "documentation"}
|
|
|
|
# Track async execution
|
|
start_time = time.time()
|
|
|
|
# This should not block despite the sleep in extract_summary
|
|
await doc_storage._create_source_records(
|
|
all_metadatas,
|
|
all_contents,
|
|
source_word_counts,
|
|
request,
|
|
"https://example.com",
|
|
"Example Site"
|
|
)
|
|
|
|
end_time = time.time()
|
|
|
|
# Verify that extract_source_summary was called
|
|
assert len(summary_call_times) == 1, "extract_source_summary should be called once"
|
|
|
|
# The async function should complete without blocking
|
|
# Even though extract_summary sleeps for 0.1s, the async function
|
|
# should not be blocked since it runs in a thread
|
|
total_time = end_time - start_time
|
|
|
|
# We can't guarantee exact timing, but it should complete
|
|
# without throwing a timeout error
|
|
assert total_time < 1.0, "Should complete in reasonable time"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_extract_summary_error_handling(self):
|
|
"""Test that errors in extract_source_summary are handled correctly."""
|
|
mock_supabase = Mock()
|
|
mock_supabase.table.return_value.upsert.return_value.execute.return_value = Mock()
|
|
|
|
doc_storage = DocumentStorageOperations(mock_supabase)
|
|
|
|
# Mock to raise an exception
|
|
def failing_extract_summary(source_id, content):
|
|
raise RuntimeError("AI service unavailable")
|
|
|
|
doc_storage.doc_storage_service.smart_chunk_text = Mock(
|
|
return_value=["chunk1"]
|
|
)
|
|
|
|
error_messages = []
|
|
|
|
with patch('src.server.services.crawling.document_storage_operations.extract_source_summary',
|
|
side_effect=failing_extract_summary):
|
|
with patch('src.server.services.crawling.document_storage_operations.update_source_info') as mock_update:
|
|
with patch('src.server.services.crawling.document_storage_operations.safe_logfire_info'):
|
|
with patch('src.server.services.crawling.document_storage_operations.safe_logfire_error') as mock_error:
|
|
mock_error.side_effect = lambda msg: error_messages.append(msg)
|
|
|
|
all_metadatas = [{"source_id": "test456", "word_count": 100}]
|
|
all_contents = ["chunk1"]
|
|
source_word_counts = {"test456": 100}
|
|
request = {}
|
|
|
|
await doc_storage._create_source_records(
|
|
all_metadatas,
|
|
all_contents,
|
|
source_word_counts,
|
|
request,
|
|
None,
|
|
None
|
|
)
|
|
|
|
# Verify error was logged
|
|
assert len(error_messages) == 1
|
|
assert "Failed to generate AI summary" in error_messages[0]
|
|
assert "AI service unavailable" in error_messages[0]
|
|
|
|
# Verify fallback summary was used
|
|
mock_update.assert_called_once()
|
|
call_args = mock_update.call_args
|
|
assert call_args.kwargs["summary"] == "Documentation from test456 - 1 pages crawled"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_multiple_sources_concurrent_summaries(self):
|
|
"""Test that multiple source summaries are generated concurrently."""
|
|
mock_supabase = Mock()
|
|
mock_supabase.table.return_value.upsert.return_value.execute.return_value = Mock()
|
|
|
|
doc_storage = DocumentStorageOperations(mock_supabase)
|
|
|
|
# Track concurrent executions
|
|
execution_order = []
|
|
|
|
def track_extract_summary(source_id, content):
|
|
execution_order.append(f"start_{source_id}")
|
|
time.sleep(0.05) # Simulate work
|
|
execution_order.append(f"end_{source_id}")
|
|
return f"Summary for {source_id}"
|
|
|
|
doc_storage.doc_storage_service.smart_chunk_text = Mock(
|
|
return_value=["chunk"]
|
|
)
|
|
|
|
with patch('src.server.services.crawling.document_storage_operations.extract_source_summary',
|
|
side_effect=track_extract_summary):
|
|
with patch('src.server.services.crawling.document_storage_operations.update_source_info'):
|
|
with patch('src.server.services.crawling.document_storage_operations.safe_logfire_info'):
|
|
# Create metadata for multiple sources
|
|
all_metadatas = [
|
|
{"source_id": "source1", "word_count": 100},
|
|
{"source_id": "source2", "word_count": 150},
|
|
{"source_id": "source3", "word_count": 200},
|
|
]
|
|
all_contents = ["chunk1", "chunk2", "chunk3"]
|
|
source_word_counts = {
|
|
"source1": 100,
|
|
"source2": 150,
|
|
"source3": 200,
|
|
}
|
|
request = {}
|
|
|
|
await doc_storage._create_source_records(
|
|
all_metadatas,
|
|
all_contents,
|
|
source_word_counts,
|
|
request,
|
|
None,
|
|
None
|
|
)
|
|
|
|
# With threading, sources are processed sequentially in the loop
|
|
# but the extract_summary calls happen in threads
|
|
assert len(execution_order) == 6 # 3 sources * 2 events each
|
|
|
|
# Verify all sources were processed
|
|
processed_sources = set()
|
|
for event in execution_order:
|
|
if event.startswith("start_"):
|
|
processed_sources.add(event.replace("start_", ""))
|
|
|
|
assert processed_sources == {"source1", "source2", "source3"}
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_thread_safety_with_variables(self):
|
|
"""Test that variables are properly passed to thread execution."""
|
|
mock_supabase = Mock()
|
|
mock_supabase.table.return_value.upsert.return_value.execute.return_value = Mock()
|
|
|
|
doc_storage = DocumentStorageOperations(mock_supabase)
|
|
|
|
# Track what gets passed to extract_summary
|
|
captured_calls = []
|
|
|
|
def capture_extract_summary(source_id, content):
|
|
captured_calls.append({
|
|
"source_id": source_id,
|
|
"content_len": len(content),
|
|
"content_preview": content[:50] if content else ""
|
|
})
|
|
return f"Summary for {source_id}"
|
|
|
|
doc_storage.doc_storage_service.smart_chunk_text = Mock(
|
|
return_value=["This is chunk one with some content",
|
|
"This is chunk two with more content"]
|
|
)
|
|
|
|
with patch('src.server.services.crawling.document_storage_operations.extract_source_summary',
|
|
side_effect=capture_extract_summary):
|
|
with patch('src.server.services.crawling.document_storage_operations.update_source_info'):
|
|
with patch('src.server.services.crawling.document_storage_operations.safe_logfire_info'):
|
|
all_metadatas = [
|
|
{"source_id": "test789", "word_count": 100},
|
|
{"source_id": "test789", "word_count": 150},
|
|
]
|
|
all_contents = [
|
|
"This is chunk one with some content",
|
|
"This is chunk two with more content"
|
|
]
|
|
source_word_counts = {"test789": 250}
|
|
request = {}
|
|
|
|
await doc_storage._create_source_records(
|
|
all_metadatas,
|
|
all_contents,
|
|
source_word_counts,
|
|
request,
|
|
None,
|
|
None
|
|
)
|
|
|
|
# Verify the correct values were passed to the thread
|
|
assert len(captured_calls) == 1
|
|
call = captured_calls[0]
|
|
assert call["source_id"] == "test789"
|
|
assert call["content_len"] > 0
|
|
# Combined content should start with space + first chunk
|
|
assert "This is chunk one" in call["content_preview"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_update_source_info_runs_in_thread(self):
|
|
"""Test that update_source_info is executed in a thread pool."""
|
|
mock_supabase = Mock()
|
|
mock_supabase.table.return_value.upsert.return_value.execute.return_value = Mock()
|
|
|
|
doc_storage = DocumentStorageOperations(mock_supabase)
|
|
|
|
# Track when update_source_info is called
|
|
update_call_times = []
|
|
|
|
def slow_update_source_info(**kwargs):
|
|
"""Simulate a slow synchronous database operation."""
|
|
update_call_times.append(time.time())
|
|
# Simulate a blocking database operation
|
|
time.sleep(0.1) # This would block the event loop if not run in thread
|
|
return None # update_source_info doesn't return anything
|
|
|
|
doc_storage.doc_storage_service.smart_chunk_text = Mock(
|
|
return_value=["chunk1"]
|
|
)
|
|
|
|
with patch('src.server.services.crawling.document_storage_operations.extract_source_summary',
|
|
return_value="Test summary"):
|
|
with patch('src.server.services.crawling.document_storage_operations.update_source_info',
|
|
side_effect=slow_update_source_info):
|
|
with patch('src.server.services.crawling.document_storage_operations.safe_logfire_info'):
|
|
with patch('src.server.services.crawling.document_storage_operations.safe_logfire_error'):
|
|
all_metadatas = [{"source_id": "test_update", "word_count": 100}]
|
|
all_contents = ["chunk1"]
|
|
source_word_counts = {"test_update": 100}
|
|
request = {"knowledge_type": "documentation", "tags": ["test"]}
|
|
|
|
start_time = time.time()
|
|
|
|
# This should not block despite the sleep in update_source_info
|
|
await doc_storage._create_source_records(
|
|
all_metadatas,
|
|
all_contents,
|
|
source_word_counts,
|
|
request,
|
|
"https://example.com",
|
|
"Example Site"
|
|
)
|
|
|
|
end_time = time.time()
|
|
|
|
# Verify that update_source_info was called
|
|
assert len(update_call_times) == 1, "update_source_info should be called once"
|
|
|
|
# The async function should complete without blocking
|
|
total_time = end_time - start_time
|
|
assert total_time < 1.0, "Should complete in reasonable time"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_update_source_info_error_handling(self):
|
|
"""Test that errors in update_source_info trigger fallback correctly."""
|
|
mock_supabase = Mock()
|
|
mock_supabase.table.return_value.upsert.return_value.execute.return_value = Mock()
|
|
|
|
doc_storage = DocumentStorageOperations(mock_supabase)
|
|
|
|
# Mock to raise an exception
|
|
def failing_update_source_info(**kwargs):
|
|
raise RuntimeError("Database connection failed")
|
|
|
|
doc_storage.doc_storage_service.smart_chunk_text = Mock(
|
|
return_value=["chunk1"]
|
|
)
|
|
|
|
error_messages = []
|
|
fallback_called = False
|
|
|
|
def track_fallback_upsert(data):
|
|
nonlocal fallback_called
|
|
fallback_called = True
|
|
return Mock(execute=Mock())
|
|
|
|
mock_supabase.table.return_value.upsert.side_effect = track_fallback_upsert
|
|
|
|
with patch('src.server.services.crawling.document_storage_operations.extract_source_summary',
|
|
return_value="Test summary"):
|
|
with patch('src.server.services.crawling.document_storage_operations.update_source_info',
|
|
side_effect=failing_update_source_info):
|
|
with patch('src.server.services.crawling.document_storage_operations.safe_logfire_info'):
|
|
with patch('src.server.services.crawling.document_storage_operations.safe_logfire_error') as mock_error:
|
|
mock_error.side_effect = lambda msg: error_messages.append(msg)
|
|
|
|
all_metadatas = [{"source_id": "test_fail", "word_count": 100}]
|
|
all_contents = ["chunk1"]
|
|
source_word_counts = {"test_fail": 100}
|
|
request = {"knowledge_type": "technical", "tags": ["test"]}
|
|
|
|
await doc_storage._create_source_records(
|
|
all_metadatas,
|
|
all_contents,
|
|
source_word_counts,
|
|
request,
|
|
"https://example.com",
|
|
"Example Site"
|
|
)
|
|
|
|
# Verify error was logged
|
|
assert any("Failed to create/update source record" in msg for msg in error_messages)
|
|
assert any("Database connection failed" in msg for msg in error_messages)
|
|
|
|
# Verify fallback was attempted
|
|
assert fallback_called, "Fallback upsert should be called"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_update_source_info_preserves_kwargs(self):
|
|
"""Test that all kwargs are properly passed to update_source_info in thread."""
|
|
mock_supabase = Mock()
|
|
mock_supabase.table.return_value.upsert.return_value.execute.return_value = Mock()
|
|
|
|
doc_storage = DocumentStorageOperations(mock_supabase)
|
|
|
|
# Track what gets passed to update_source_info
|
|
captured_kwargs = {}
|
|
|
|
def capture_update_source_info(**kwargs):
|
|
captured_kwargs.update(kwargs)
|
|
return None
|
|
|
|
doc_storage.doc_storage_service.smart_chunk_text = Mock(
|
|
return_value=["chunk content"]
|
|
)
|
|
|
|
with patch('src.server.services.crawling.document_storage_operations.extract_source_summary',
|
|
return_value="Generated summary"):
|
|
with patch('src.server.services.crawling.document_storage_operations.update_source_info',
|
|
side_effect=capture_update_source_info):
|
|
with patch('src.server.services.crawling.document_storage_operations.safe_logfire_info'):
|
|
all_metadatas = [{"source_id": "test_kwargs", "word_count": 250}]
|
|
all_contents = ["chunk content"]
|
|
source_word_counts = {"test_kwargs": 250}
|
|
request = {
|
|
"knowledge_type": "api_reference",
|
|
"tags": ["api", "docs"],
|
|
"url": "https://original.url/crawl"
|
|
}
|
|
|
|
await doc_storage._create_source_records(
|
|
all_metadatas,
|
|
all_contents,
|
|
source_word_counts,
|
|
request,
|
|
"https://source.url",
|
|
"Source Display Name"
|
|
)
|
|
|
|
# Verify all kwargs were passed correctly
|
|
assert captured_kwargs["client"] == mock_supabase
|
|
assert captured_kwargs["source_id"] == "test_kwargs"
|
|
assert captured_kwargs["summary"] == "Generated summary"
|
|
assert captured_kwargs["word_count"] == 250
|
|
assert "chunk content" in captured_kwargs["content"]
|
|
assert captured_kwargs["knowledge_type"] == "api_reference"
|
|
assert captured_kwargs["tags"] == ["api", "docs"]
|
|
assert captured_kwargs["update_frequency"] == 0
|
|
assert captured_kwargs["original_url"] == "https://original.url/crawl"
|
|
assert captured_kwargs["source_url"] == "https://source.url"
|
|
assert captured_kwargs["source_display_name"] == "Source Display Name" |