Complete logging fixes for all statements in threading service
Applied the extra parameter pattern to all remaining logging statements (11 more) to ensure consistency and prevent runtime errors when any code path is executed. This completes the fix for the entire file.
This commit is contained in:
parent
43d83a08d3
commit
468463997d
@ -98,8 +98,10 @@ class RateLimiter:
|
||||
if wait_time > 0:
|
||||
logfire_logger.info(
|
||||
f"Rate limiting: waiting {wait_time:.1f}s",
|
||||
tokens=estimated_tokens,
|
||||
current_usage=self._get_current_usage(),
|
||||
extra={
|
||||
"tokens": estimated_tokens,
|
||||
"current_usage": self._get_current_usage(),
|
||||
}
|
||||
)
|
||||
await asyncio.sleep(wait_time)
|
||||
return await self.acquire(estimated_tokens)
|
||||
@ -199,16 +201,20 @@ class MemoryAdaptiveDispatcher:
|
||||
workers = max(1, base // 2)
|
||||
logfire_logger.warning(
|
||||
"High memory usage detected, reducing workers",
|
||||
memory_percent=metrics.memory_percent,
|
||||
workers=workers,
|
||||
extra={
|
||||
"memory_percent": metrics.memory_percent,
|
||||
"workers": workers,
|
||||
}
|
||||
)
|
||||
elif metrics.cpu_percent > self.config.cpu_threshold * 100:
|
||||
# Reduce workers when CPU is high
|
||||
workers = max(1, base // 2)
|
||||
logfire_logger.warning(
|
||||
"High CPU usage detected, reducing workers",
|
||||
cpu_percent=metrics.cpu_percent,
|
||||
workers=workers,
|
||||
extra={
|
||||
"cpu_percent": metrics.cpu_percent,
|
||||
"workers": workers,
|
||||
}
|
||||
)
|
||||
elif metrics.memory_percent < 50 and metrics.cpu_percent < 50:
|
||||
# Increase workers when resources are available
|
||||
@ -239,11 +245,13 @@ class MemoryAdaptiveDispatcher:
|
||||
|
||||
logfire_logger.info(
|
||||
"Starting adaptive processing",
|
||||
items_count=len(items),
|
||||
workers=optimal_workers,
|
||||
mode=mode,
|
||||
memory_percent=self.last_metrics.memory_percent,
|
||||
cpu_percent=self.last_metrics.cpu_percent,
|
||||
extra={
|
||||
"items_count": len(items),
|
||||
"workers": optimal_workers,
|
||||
"mode": mode,
|
||||
"memory_percent": self.last_metrics.memory_percent,
|
||||
"cpu_percent": self.last_metrics.cpu_percent,
|
||||
}
|
||||
)
|
||||
|
||||
# Track active workers
|
||||
@ -318,7 +326,8 @@ class MemoryAdaptiveDispatcher:
|
||||
del active_workers[worker_id]
|
||||
|
||||
logfire_logger.error(
|
||||
f"Processing failed for item {index}", error=str(e), item_index=index
|
||||
f"Processing failed for item {index}",
|
||||
extra={"error": str(e), "item_index": index}
|
||||
)
|
||||
return None
|
||||
|
||||
@ -334,10 +343,12 @@ class MemoryAdaptiveDispatcher:
|
||||
success_rate = len(successful_results) / len(items) * 100
|
||||
logfire_logger.info(
|
||||
"Adaptive processing completed",
|
||||
total_items=len(items),
|
||||
successful=len(successful_results),
|
||||
success_rate=f"{success_rate:.1f}%",
|
||||
workers_used=optimal_workers,
|
||||
extra={
|
||||
"total_items": len(items),
|
||||
"successful": len(successful_results),
|
||||
"success_rate": f"{success_rate:.1f}%",
|
||||
"workers_used": optimal_workers,
|
||||
}
|
||||
)
|
||||
|
||||
return successful_results
|
||||
@ -355,7 +366,8 @@ class WebSocketSafeProcessor:
|
||||
await websocket.accept()
|
||||
self.active_connections.append(websocket)
|
||||
logfire_logger.info(
|
||||
"WebSocket client connected", total_connections=len(self.active_connections)
|
||||
"WebSocket client connected",
|
||||
extra={"total_connections": len(self.active_connections)}
|
||||
)
|
||||
|
||||
def disconnect(self, websocket: WebSocket):
|
||||
@ -363,7 +375,8 @@ class WebSocketSafeProcessor:
|
||||
if websocket in self.active_connections:
|
||||
self.active_connections.remove(websocket)
|
||||
logfire_logger.info(
|
||||
"WebSocket client disconnected", remaining_connections=len(self.active_connections)
|
||||
"WebSocket client disconnected",
|
||||
extra={"remaining_connections": len(self.active_connections)}
|
||||
)
|
||||
|
||||
async def broadcast_progress(self, message: dict[str, Any]):
|
||||
@ -564,17 +577,20 @@ class ThreadingService:
|
||||
# Log system metrics
|
||||
logfire_logger.info(
|
||||
"System health check",
|
||||
memory_percent=metrics.memory_percent,
|
||||
cpu_percent=metrics.cpu_percent,
|
||||
available_memory_gb=metrics.available_memory_gb,
|
||||
active_threads=metrics.active_threads,
|
||||
active_websockets=len(self.websocket_processor.active_connections),
|
||||
extra={
|
||||
"memory_percent": metrics.memory_percent,
|
||||
"cpu_percent": metrics.cpu_percent,
|
||||
"available_memory_gb": metrics.available_memory_gb,
|
||||
"active_threads": metrics.active_threads,
|
||||
"active_websockets": len(self.websocket_processor.active_connections),
|
||||
}
|
||||
)
|
||||
|
||||
# Alert on critical thresholds
|
||||
if metrics.memory_percent > 90:
|
||||
logfire_logger.warning(
|
||||
"Critical memory usage", memory_percent=metrics.memory_percent
|
||||
"Critical memory usage",
|
||||
extra={"memory_percent": metrics.memory_percent}
|
||||
)
|
||||
# Force garbage collection
|
||||
gc.collect()
|
||||
@ -588,8 +604,10 @@ class ThreadingService:
|
||||
if metrics.active_threads > self.config.max_workers * 3:
|
||||
logfire_logger.warning(
|
||||
"High thread count detected",
|
||||
active_threads=metrics.active_threads,
|
||||
max_expected=self.config.max_workers * 3,
|
||||
extra={
|
||||
"active_threads": metrics.active_threads,
|
||||
"max_expected": self.config.max_workers * 3,
|
||||
}
|
||||
)
|
||||
|
||||
await asyncio.sleep(self.config.health_check_interval)
|
||||
|
||||
Loading…
Reference in New Issue
Block a user