Coverage for src / augint_library / resilience.py: 95%
121 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-30 20:22 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-30 20:22 +0000
1"""Resilience patterns for handling transient failures.
3This module provides production-ready retry and circuit breaker patterns for
4building fault-tolerant systems. It demonstrates:
5- Retry with exponential backoff and jitter
6- Circuit breaker pattern to prevent cascading failures
7- Thread-safe implementations
8- Monitoring and observability hooks
10Common Use Cases:
11 1. External API calls that may fail transiently
12 2. Database operations during high load
13 3. Microservice communication
14 4. Preventing thundering herd problems
15 5. Graceful degradation of services
17Examples:
18 Basic retry pattern:
19 >>> from augint_library.resilience import retry
20 >>>
21 >>> @retry(max_attempts=3, initial_delay=1.0)
22 >>> def call_flaky_api():
23 ... # This might fail occasionally
24 ... response = requests.get("https://api.example.com/data")
25 ... return response.json()
27 Circuit breaker for cascading failure prevention:
28 >>> from augint_library.resilience import circuit_breaker
29 >>>
30 >>> @circuit_breaker(failure_threshold=5, recovery_timeout=60)
31 >>> def call_payment_service(amount):
32 ... # If this fails 5 times, circuit opens for 60 seconds
33 ... return payment_api.charge(amount)
35 Combining patterns for maximum resilience:
36 >>> @circuit_breaker(failure_threshold=5, recovery_timeout=60)
37 >>> @retry(max_attempts=3, initial_delay=0.5)
38 >>> def robust_api_call(endpoint):
39 ... # Circuit breaker prevents overwhelming a failing service
40 ... # Retry handles transient network issues
41 ... return fetch_data(endpoint)
43 Monitoring circuit breaker state:
44 >>> from augint_library.resilience import get_circuit_breakers
45 >>>
46 >>> # Check all circuit breakers
47 >>> states = get_circuit_breakers()
48 >>> for name, state in states.items():
49 ... print(f"{name}: {state['state']} (failures: {state['failure_count']})")
51 Handling circuit breaker exceptions:
52 >>> from augint_library.resilience import CircuitOpenError
53 >>>
54 >>> try:
55 ... result = call_payment_service(100)
56 ... except CircuitOpenError as e:
57 ... # Circuit is open, use fallback
58 ... logger.warning(f"Payment service down: {e}")
59 ... return {"status": "queued", "retry_after": e.details['retry_after']}
61Best Practices:
62 1. Always set reasonable timeouts on retried operations
63 2. Use jitter to prevent thundering herd
64 3. Monitor circuit breaker states
65 4. Have fallback strategies when circuits open
66 5. Log all retry attempts and circuit state changes
68Configuration Guidelines:
69 Retry:
70 - max_attempts: 3-5 for user-facing, up to 10 for background
71 - initial_delay: 0.1-1.0 seconds
72 - exponential_base: 2.0 (doubles each time)
73 - jitter: True (prevents synchronized retries)
75 Circuit Breaker:
76 - failure_threshold: 5-10 consecutive failures
77 - recovery_timeout: 30-300 seconds
78 - success_threshold: 2-5 successes to close
80Advanced Patterns:
81 Custom retry conditions:
82 >>> @retry(max_attempts=3, retryable_exceptions=(TimeoutError, ConnectionError))
83 >>> def network_operation():
84 ... # Only retries on specific network errors
85 ... pass
87 Circuit breaker with half-open testing:
88 >>> @circuit_breaker(
89 ... failure_threshold=5,
90 ... recovery_timeout=60,
91 ... success_threshold=3 # Need 3 successes to fully close
92 ... )
93 >>> def gradual_recovery_service():
94 ... pass
96Note:
97 These patterns are thread-safe but not process-safe. For distributed
98 systems, consider using Redis-backed circuit breakers or service mesh
99 features like Istio/Envoy.
100"""
102import random
103import time
104from dataclasses import dataclass
105from enum import Enum
106from functools import wraps
107from threading import Lock
108from typing import Any, Callable, Optional
110from .constants import (
111 DEFAULT_EXPONENTIAL_BASE,
112 DEFAULT_FAILURE_THRESHOLD,
113 DEFAULT_INITIAL_DELAY,
114 DEFAULT_MAX_DELAY,
115 DEFAULT_RECOVERY_TIMEOUT,
116 DEFAULT_RETRY_ATTEMPTS,
117 DEFAULT_SUCCESS_THRESHOLD,
118 JITTER_MAX_FACTOR,
119 JITTER_MIN_FACTOR,
120)
121from .exceptions import AugintError, ErrorCode
124class CircuitState(Enum):
125 """States for the circuit breaker pattern."""
127 CLOSED = "closed" # Normal operation, requests pass through
128 OPEN = "open" # Failures exceeded threshold, requests are blocked
129 HALF_OPEN = "half_open" # Testing if service has recovered
132@dataclass
133class RetryConfig:
134 """Configuration for retry behavior."""
136 max_attempts: int = DEFAULT_RETRY_ATTEMPTS
137 initial_delay: float = DEFAULT_INITIAL_DELAY
138 max_delay: float = DEFAULT_MAX_DELAY
139 exponential_base: float = DEFAULT_EXPONENTIAL_BASE
140 jitter: bool = True
141 retryable_exceptions: tuple[type[Exception], ...] = (Exception,)
144@dataclass
145class CircuitBreakerConfig:
146 """Configuration for circuit breaker behavior."""
148 failure_threshold: int = DEFAULT_FAILURE_THRESHOLD
149 recovery_timeout: float = DEFAULT_RECOVERY_TIMEOUT
150 success_threshold: int = DEFAULT_SUCCESS_THRESHOLD
151 expected_exception: type[Exception] = Exception
154class CircuitOpenError(AugintError):
155 """Raised when circuit breaker is open and rejecting requests."""
157 def __init__(self, service: str, recovery_time: float) -> None:
158 """Initialize CircuitOpenError."""
159 super().__init__(
160 f"Circuit breaker is OPEN for {service}",
161 code=ErrorCode.NETWORK_ERROR,
162 details={
163 "service": service,
164 "recovery_in": f"{recovery_time:.1f}s",
165 "state": CircuitState.OPEN.value,
166 },
167 )
170class CircuitBreaker:
171 """Circuit breaker implementation to prevent cascading failures."""
173 def __init__(self, name: str, config: CircuitBreakerConfig):
174 """Initialize circuit breaker."""
175 self.name = name
176 self.config = config
177 self.state = CircuitState.CLOSED
178 self.failure_count = 0
179 self.success_count = 0
180 self.last_failure_time: Optional[float] = None
181 self._lock = Lock()
183 def call(self, func: Callable[..., Any], *args: Any, **kwargs: Any) -> Any:
184 """Execute function through circuit breaker."""
185 with self._lock:
186 # Check if we should transition from OPEN to HALF_OPEN
187 if self.state == CircuitState.OPEN:
188 if (
189 self.last_failure_time
190 and time.time() - self.last_failure_time >= self.config.recovery_timeout
191 ):
192 self.state = CircuitState.HALF_OPEN
193 self.success_count = 0
194 else:
195 recovery_time = self.config.recovery_timeout - (
196 time.time() - (self.last_failure_time or 0)
197 )
198 raise CircuitOpenError(self.name, recovery_time)
200 try:
201 result = func(*args, **kwargs)
202 except self.config.expected_exception:
203 self._on_failure()
204 raise
205 else:
206 self._on_success()
207 return result
209 def _on_success(self) -> None:
210 """Handle successful call."""
211 with self._lock:
212 self.failure_count = 0
214 if self.state == CircuitState.HALF_OPEN:
215 self.success_count += 1
216 if self.success_count >= self.config.success_threshold:
217 self.state = CircuitState.CLOSED
219 def _on_failure(self) -> None:
220 """Handle failed call."""
221 with self._lock:
222 self.failure_count += 1
223 self.last_failure_time = time.time()
225 if self.state == CircuitState.HALF_OPEN or (
226 self.state == CircuitState.CLOSED
227 and self.failure_count >= self.config.failure_threshold
228 ):
229 self.state = CircuitState.OPEN
231 def get_state(self) -> dict[str, Any]:
232 """Get current circuit breaker state."""
233 with self._lock:
234 return {
235 "name": self.name,
236 "state": self.state.value,
237 "failure_count": self.failure_count,
238 "success_count": self.success_count,
239 "last_failure": self.last_failure_time,
240 }
243# Global registry for circuit breakers (for CLI inspection)
244_circuit_breakers: dict[str, CircuitBreaker] = {}
245_registry_lock = Lock()
248def retry(
249 max_attempts: int = DEFAULT_RETRY_ATTEMPTS,
250 initial_delay: float = DEFAULT_INITIAL_DELAY,
251 max_delay: float = DEFAULT_MAX_DELAY,
252 exponential_base: float = DEFAULT_EXPONENTIAL_BASE,
253 jitter: bool = True,
254 retryable_exceptions: tuple[type[Exception], ...] = (Exception,),
255) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
256 """Decorator for retrying functions with exponential backoff.
258 Args:
259 max_attempts: Maximum number of attempts.
260 initial_delay: Initial delay between retries in seconds.
261 max_delay: Maximum delay between retries.
262 exponential_base: Base for exponential backoff.
263 jitter: Whether to add random jitter to delays.
264 retryable_exceptions: Tuple of exceptions to retry on.
266 Returns:
267 Decorated function that implements retry logic.
269 Example:
270 >>> @retry(max_attempts=3, initial_delay=1.0)
271 ... def flaky_operation():
272 ... # This might fail transiently
273 ... return fetch_external_data()
274 """
275 config = RetryConfig(
276 max_attempts=max_attempts,
277 initial_delay=initial_delay,
278 max_delay=max_delay,
279 exponential_base=exponential_base,
280 jitter=jitter,
281 retryable_exceptions=retryable_exceptions,
282 )
284 def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
285 @wraps(func)
286 def wrapper(*args: Any, **kwargs: Any) -> Any:
287 last_exception: Optional[Exception] = None
289 for attempt in range(config.max_attempts): 289 ↛ 311line 289 didn't jump to line 311 because the loop on line 289 didn't complete
290 try:
291 return func(*args, **kwargs)
292 except config.retryable_exceptions as e:
293 last_exception = e
295 if attempt == config.max_attempts - 1:
296 # Last attempt, re-raise
297 raise
299 # Calculate delay with exponential backoff
300 delay = min(
301 config.initial_delay * (config.exponential_base**attempt), config.max_delay
302 )
304 # Add jitter to prevent thundering herd
305 if config.jitter:
306 delay *= JITTER_MIN_FACTOR + (random.random() * JITTER_MAX_FACTOR) # noqa: S311 - jitter for backoff
308 time.sleep(delay)
310 # Should never reach here, but just in case
311 if last_exception:
312 raise last_exception
313 return None
315 return wrapper
317 return decorator
320def circuit_breaker(
321 failure_threshold: int = DEFAULT_FAILURE_THRESHOLD,
322 recovery_timeout: float = DEFAULT_RECOVERY_TIMEOUT,
323 success_threshold: int = DEFAULT_SUCCESS_THRESHOLD,
324 expected_exception: type[Exception] = Exception,
325 name: Optional[str] = None,
326) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
327 """Decorator for circuit breaker pattern.
329 Args:
330 failure_threshold: Number of failures before opening circuit.
331 recovery_timeout: Time in seconds before attempting recovery.
332 success_threshold: Successes needed to close circuit from half-open.
333 expected_exception: Exception type that indicates failure.
334 name: Optional name for the circuit breaker.
336 Returns:
337 Decorated function with circuit breaker protection.
339 Example:
340 >>> @circuit_breaker(failure_threshold=5, recovery_timeout=60)
341 ... def call_external_service():
342 ... return requests.get("https://api.example.com/data")
343 """
344 config = CircuitBreakerConfig(
345 failure_threshold=failure_threshold,
346 recovery_timeout=recovery_timeout,
347 success_threshold=success_threshold,
348 expected_exception=expected_exception,
349 )
351 def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
352 # Use function name as circuit breaker name if not provided
353 cb_name = name or f"{func.__module__}.{func.__name__}"
355 # Get or create circuit breaker
356 with _registry_lock:
357 if cb_name not in _circuit_breakers: 357 ↛ 359line 357 didn't jump to line 359 because the condition on line 357 was always true
358 _circuit_breakers[cb_name] = CircuitBreaker(cb_name, config)
359 cb = _circuit_breakers[cb_name]
361 @wraps(func)
362 def wrapper(*args: Any, **kwargs: Any) -> Any:
363 return cb.call(func, *args, **kwargs)
365 return wrapper
367 return decorator
370def get_circuit_breakers() -> dict[str, dict[str, Any]]:
371 """Get status of all circuit breakers.
373 Returns:
374 Dictionary mapping circuit breaker names to their states.
375 """
376 with _registry_lock:
377 return {name: cb.get_state() for name, cb in _circuit_breakers.items()}
380def reset_circuit_breaker(name: str) -> bool:
381 """Reset a specific circuit breaker.
383 Args:
384 name: Name of the circuit breaker to reset.
386 Returns:
387 True if reset was successful, False if breaker not found.
388 """
389 with _registry_lock:
390 if name in _circuit_breakers:
391 cb = _circuit_breakers[name]
392 with cb._lock:
393 cb.state = CircuitState.CLOSED
394 cb.failure_count = 0
395 cb.success_count = 0
396 cb.last_failure_time = None
397 return True
398 return False