diff --git a/python/src/server/services/threading_service.py b/python/src/server/services/threading_service.py index 5db8ae4..b3a0053 100644 --- a/python/src/server/services/threading_service.py +++ b/python/src/server/services/threading_service.py @@ -98,8 +98,10 @@ class RateLimiter: if wait_time > 0: logfire_logger.info( f"Rate limiting: waiting {wait_time:.1f}s", - tokens=estimated_tokens, - current_usage=self._get_current_usage(), + extra={ + "tokens": estimated_tokens, + "current_usage": self._get_current_usage(), + } ) await asyncio.sleep(wait_time) return await self.acquire(estimated_tokens) @@ -199,16 +201,20 @@ class MemoryAdaptiveDispatcher: workers = max(1, base // 2) logfire_logger.warning( "High memory usage detected, reducing workers", - memory_percent=metrics.memory_percent, - workers=workers, + extra={ + "memory_percent": metrics.memory_percent, + "workers": workers, + } ) elif metrics.cpu_percent > self.config.cpu_threshold * 100: # Reduce workers when CPU is high workers = max(1, base // 2) logfire_logger.warning( "High CPU usage detected, reducing workers", - cpu_percent=metrics.cpu_percent, - workers=workers, + extra={ + "cpu_percent": metrics.cpu_percent, + "workers": workers, + } ) elif metrics.memory_percent < 50 and metrics.cpu_percent < 50: # Increase workers when resources are available @@ -239,11 +245,13 @@ class MemoryAdaptiveDispatcher: logfire_logger.info( "Starting adaptive processing", - items_count=len(items), - workers=optimal_workers, - mode=mode, - memory_percent=self.last_metrics.memory_percent, - cpu_percent=self.last_metrics.cpu_percent, + extra={ + "items_count": len(items), + "workers": optimal_workers, + "mode": mode, + "memory_percent": self.last_metrics.memory_percent, + "cpu_percent": self.last_metrics.cpu_percent, + } ) # Track active workers @@ -318,7 +326,8 @@ class MemoryAdaptiveDispatcher: del active_workers[worker_id] logfire_logger.error( - f"Processing failed for item {index}", error=str(e), item_index=index + f"Processing failed for item {index}", + extra={"error": str(e), "item_index": index} ) return None @@ -334,10 +343,12 @@ class MemoryAdaptiveDispatcher: success_rate = len(successful_results) / len(items) * 100 logfire_logger.info( "Adaptive processing completed", - total_items=len(items), - successful=len(successful_results), - success_rate=f"{success_rate:.1f}%", - workers_used=optimal_workers, + extra={ + "total_items": len(items), + "successful": len(successful_results), + "success_rate": f"{success_rate:.1f}%", + "workers_used": optimal_workers, + } ) return successful_results @@ -355,7 +366,8 @@ class WebSocketSafeProcessor: await websocket.accept() self.active_connections.append(websocket) logfire_logger.info( - "WebSocket client connected", total_connections=len(self.active_connections) + "WebSocket client connected", + extra={"total_connections": len(self.active_connections)} ) def disconnect(self, websocket: WebSocket): @@ -363,7 +375,8 @@ class WebSocketSafeProcessor: if websocket in self.active_connections: self.active_connections.remove(websocket) logfire_logger.info( - "WebSocket client disconnected", remaining_connections=len(self.active_connections) + "WebSocket client disconnected", + extra={"remaining_connections": len(self.active_connections)} ) async def broadcast_progress(self, message: dict[str, Any]): @@ -564,17 +577,20 @@ class ThreadingService: # Log system metrics logfire_logger.info( "System health check", - memory_percent=metrics.memory_percent, - cpu_percent=metrics.cpu_percent, - available_memory_gb=metrics.available_memory_gb, - active_threads=metrics.active_threads, - active_websockets=len(self.websocket_processor.active_connections), + extra={ + "memory_percent": metrics.memory_percent, + "cpu_percent": metrics.cpu_percent, + "available_memory_gb": metrics.available_memory_gb, + "active_threads": metrics.active_threads, + "active_websockets": len(self.websocket_processor.active_connections), + } ) # Alert on critical thresholds if metrics.memory_percent > 90: logfire_logger.warning( - "Critical memory usage", memory_percent=metrics.memory_percent + "Critical memory usage", + extra={"memory_percent": metrics.memory_percent} ) # Force garbage collection gc.collect() @@ -588,8 +604,10 @@ class ThreadingService: if metrics.active_threads > self.config.max_workers * 3: logfire_logger.warning( "High thread count detected", - active_threads=metrics.active_threads, - max_expected=self.config.max_workers * 3, + extra={ + "active_threads": metrics.active_threads, + "max_expected": self.config.max_workers * 3, + } ) await asyncio.sleep(self.config.health_check_interval)