Hotfix - crawls hanging after embedding rate limiting

This commit is contained in:
Cole Medin 2025-08-30 08:20:02 -05:00
parent 3e204b0be1
commit 96c84b5cf2
3 changed files with 89 additions and 23 deletions

View File

@ -205,8 +205,25 @@ async def create_embeddings_batch(
batch_tokens = sum(len(text.split()) for text in batch) * 1.3 batch_tokens = sum(len(text.split()) for text in batch) * 1.3
total_tokens_used += batch_tokens total_tokens_used += batch_tokens
# Create rate limit progress callback if we have a progress callback
rate_limit_callback = None
if progress_callback or websocket:
async def rate_limit_callback(data: dict):
# Send heartbeat during rate limit wait
if progress_callback:
processed = result.success_count + result.failure_count
message = f"Rate limited: {data.get('message', 'Waiting...')}"
await progress_callback(message, (processed / len(texts)) * 100)
if websocket:
await websocket.send_json({
"type": "rate_limit_wait",
"message": data.get("message", "Rate limited, waiting..."),
"remaining_seconds": data.get("remaining_seconds", 0)
})
# Rate limit each batch # Rate limit each batch
async with threading_service.rate_limited_operation(batch_tokens): async with threading_service.rate_limited_operation(batch_tokens, rate_limit_callback):
retry_count = 0 retry_count = 0
max_retries = 3 max_retries = 3

View File

@ -233,9 +233,23 @@ async def add_documents_to_supabase(
# If not using contextual embeddings, use original contents # If not using contextual embeddings, use original contents
contextual_contents = batch_contents contextual_contents = batch_contents
# Create embeddings for the batch - no progress reporting # Create embeddings for the batch with rate limit progress support
# Don't pass websocket to avoid Socket.IO issues # Create a wrapper for progress callback to handle rate limiting updates
result = await create_embeddings_batch(contextual_contents, provider=provider) async def embedding_progress_wrapper(message: str, percentage: float):
# Forward rate limiting messages to the main progress callback
if progress_callback and "rate limit" in message.lower():
await progress_callback(
message,
current_percentage, # Use current batch progress
{"batch": batch_num, "type": "rate_limit_wait"}
)
# Pass progress callback for rate limiting updates
result = await create_embeddings_batch(
contextual_contents,
provider=provider,
progress_callback=embedding_progress_wrapper if progress_callback else None
)
# Log any failures # Log any failures
if result.has_failures: if result.has_failures:

View File

@ -84,16 +84,30 @@ class RateLimiter:
self.semaphore = asyncio.Semaphore(config.max_concurrent) self.semaphore = asyncio.Semaphore(config.max_concurrent)
self._lock = asyncio.Lock() self._lock = asyncio.Lock()
async def acquire(self, estimated_tokens: int = 8000) -> bool: async def acquire(self, estimated_tokens: int = 8000, progress_callback: Callable | None = None) -> bool:
"""Acquire permission to make API call with token awareness""" """Acquire permission to make API call with token awareness
async with self._lock:
now = time.time() Args:
estimated_tokens: Estimated number of tokens for the operation
progress_callback: Optional async callback for progress updates during wait
"""
while True: # Loop instead of recursion to avoid stack overflow
wait_time_to_sleep = None
async with self._lock:
now = time.time()
# Clean old entries # Clean old entries
self._clean_old_entries(now) self._clean_old_entries(now)
# Check if we can make the request # Check if we can make the request
if not self._can_make_request(estimated_tokens): if self._can_make_request(estimated_tokens):
# Record the request
self.request_times.append(now)
self.token_usage.append((now, estimated_tokens))
return True
# Calculate wait time if we can't make the request
wait_time = self._calculate_wait_time(estimated_tokens) wait_time = self._calculate_wait_time(estimated_tokens)
if wait_time > 0: if wait_time > 0:
logfire_logger.info( logfire_logger.info(
@ -103,14 +117,30 @@ class RateLimiter:
"current_usage": self._get_current_usage(), "current_usage": self._get_current_usage(),
} }
) )
await asyncio.sleep(wait_time) wait_time_to_sleep = wait_time
return await self.acquire(estimated_tokens) else:
return False return False
# Record the request # Sleep outside the lock to avoid deadlock
self.request_times.append(now) if wait_time_to_sleep is not None:
self.token_usage.append((now, estimated_tokens)) # For long waits, break into smaller chunks with progress updates
return True if wait_time_to_sleep > 5 and progress_callback:
chunks = int(wait_time_to_sleep / 5) # 5 second chunks
for i in range(chunks):
await asyncio.sleep(5)
remaining = wait_time_to_sleep - (i + 1) * 5
if progress_callback:
await progress_callback({
"type": "rate_limit_wait",
"remaining_seconds": max(0, remaining),
"message": f"waiting {max(0, remaining):.1f}s more..."
})
# Sleep any remaining time
if wait_time_to_sleep % 5 > 0:
await asyncio.sleep(wait_time_to_sleep % 5)
else:
await asyncio.sleep(wait_time_to_sleep)
# Continue the loop to try again
def _can_make_request(self, estimated_tokens: int) -> bool: def _can_make_request(self, estimated_tokens: int) -> bool:
"""Check if request can be made within limits""" """Check if request can be made within limits"""
@ -510,10 +540,15 @@ class ThreadingService:
logfire_logger.info("Threading service stopped") logfire_logger.info("Threading service stopped")
@asynccontextmanager @asynccontextmanager
async def rate_limited_operation(self, estimated_tokens: int = 8000): async def rate_limited_operation(self, estimated_tokens: int = 8000, progress_callback: Callable | None = None):
"""Context manager for rate-limited operations""" """Context manager for rate-limited operations
Args:
estimated_tokens: Estimated number of tokens for the operation
progress_callback: Optional async callback for progress updates during wait
"""
async with self.rate_limiter.semaphore: async with self.rate_limiter.semaphore:
can_proceed = await self.rate_limiter.acquire(estimated_tokens) can_proceed = await self.rate_limiter.acquire(estimated_tokens, progress_callback)
if not can_proceed: if not can_proceed:
raise Exception("Rate limit exceeded") raise Exception("Rate limit exceeded")