Coverage for src / augint_library / resilience.py: 95%

121 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-30 20:22 +0000

1"""Resilience patterns for handling transient failures. 

2 

3This module provides production-ready retry and circuit breaker patterns for 

4building fault-tolerant systems. It demonstrates: 

5- Retry with exponential backoff and jitter 

6- Circuit breaker pattern to prevent cascading failures 

7- Thread-safe implementations 

8- Monitoring and observability hooks 

9 

10Common Use Cases: 

11 1. External API calls that may fail transiently 

12 2. Database operations during high load 

13 3. Microservice communication 

14 4. Preventing thundering herd problems 

15 5. Graceful degradation of services 

16 

17Examples: 

18 Basic retry pattern: 

19 >>> from augint_library.resilience import retry 

20 >>> 

21 >>> @retry(max_attempts=3, initial_delay=1.0) 

22 >>> def call_flaky_api(): 

23 ... # This might fail occasionally 

24 ... response = requests.get("https://api.example.com/data") 

25 ... return response.json() 

26 

27 Circuit breaker for cascading failure prevention: 

28 >>> from augint_library.resilience import circuit_breaker 

29 >>> 

30 >>> @circuit_breaker(failure_threshold=5, recovery_timeout=60) 

31 >>> def call_payment_service(amount): 

32 ... # If this fails 5 times, circuit opens for 60 seconds 

33 ... return payment_api.charge(amount) 

34 

35 Combining patterns for maximum resilience: 

36 >>> @circuit_breaker(failure_threshold=5, recovery_timeout=60) 

37 >>> @retry(max_attempts=3, initial_delay=0.5) 

38 >>> def robust_api_call(endpoint): 

39 ... # Circuit breaker prevents overwhelming a failing service 

40 ... # Retry handles transient network issues 

41 ... return fetch_data(endpoint) 

42 

43 Monitoring circuit breaker state: 

44 >>> from augint_library.resilience import get_circuit_breakers 

45 >>> 

46 >>> # Check all circuit breakers 

47 >>> states = get_circuit_breakers() 

48 >>> for name, state in states.items(): 

49 ... print(f"{name}: {state['state']} (failures: {state['failure_count']})") 

50 

51 Handling circuit breaker exceptions: 

52 >>> from augint_library.resilience import CircuitOpenError 

53 >>> 

54 >>> try: 

55 ... result = call_payment_service(100) 

56 ... except CircuitOpenError as e: 

57 ... # Circuit is open, use fallback 

58 ... logger.warning(f"Payment service down: {e}") 

59 ... return {"status": "queued", "retry_after": e.details['retry_after']} 

60 

61Best Practices: 

62 1. Always set reasonable timeouts on retried operations 

63 2. Use jitter to prevent thundering herd 

64 3. Monitor circuit breaker states 

65 4. Have fallback strategies when circuits open 

66 5. Log all retry attempts and circuit state changes 

67 

68Configuration Guidelines: 

69 Retry: 

70 - max_attempts: 3-5 for user-facing, up to 10 for background 

71 - initial_delay: 0.1-1.0 seconds 

72 - exponential_base: 2.0 (doubles each time) 

73 - jitter: True (prevents synchronized retries) 

74 

75 Circuit Breaker: 

76 - failure_threshold: 5-10 consecutive failures 

77 - recovery_timeout: 30-300 seconds 

78 - success_threshold: 2-5 successes to close 

79 

80Advanced Patterns: 

81 Custom retry conditions: 

82 >>> @retry(max_attempts=3, retryable_exceptions=(TimeoutError, ConnectionError)) 

83 >>> def network_operation(): 

84 ... # Only retries on specific network errors 

85 ... pass 

86 

87 Circuit breaker with half-open testing: 

88 >>> @circuit_breaker( 

89 ... failure_threshold=5, 

90 ... recovery_timeout=60, 

91 ... success_threshold=3 # Need 3 successes to fully close 

92 ... ) 

93 >>> def gradual_recovery_service(): 

94 ... pass 

95 

96Note: 

97 These patterns are thread-safe but not process-safe. For distributed 

98 systems, consider using Redis-backed circuit breakers or service mesh 

99 features like Istio/Envoy. 

100""" 

101 

102import random 

103import time 

104from dataclasses import dataclass 

105from enum import Enum 

106from functools import wraps 

107from threading import Lock 

108from typing import Any, Callable, Optional 

109 

110from .constants import ( 

111 DEFAULT_EXPONENTIAL_BASE, 

112 DEFAULT_FAILURE_THRESHOLD, 

113 DEFAULT_INITIAL_DELAY, 

114 DEFAULT_MAX_DELAY, 

115 DEFAULT_RECOVERY_TIMEOUT, 

116 DEFAULT_RETRY_ATTEMPTS, 

117 DEFAULT_SUCCESS_THRESHOLD, 

118 JITTER_MAX_FACTOR, 

119 JITTER_MIN_FACTOR, 

120) 

121from .exceptions import AugintError, ErrorCode 

122 

123 

124class CircuitState(Enum): 

125 """States for the circuit breaker pattern.""" 

126 

127 CLOSED = "closed" # Normal operation, requests pass through 

128 OPEN = "open" # Failures exceeded threshold, requests are blocked 

129 HALF_OPEN = "half_open" # Testing if service has recovered 

130 

131 

132@dataclass 

133class RetryConfig: 

134 """Configuration for retry behavior.""" 

135 

136 max_attempts: int = DEFAULT_RETRY_ATTEMPTS 

137 initial_delay: float = DEFAULT_INITIAL_DELAY 

138 max_delay: float = DEFAULT_MAX_DELAY 

139 exponential_base: float = DEFAULT_EXPONENTIAL_BASE 

140 jitter: bool = True 

141 retryable_exceptions: tuple[type[Exception], ...] = (Exception,) 

142 

143 

144@dataclass 

145class CircuitBreakerConfig: 

146 """Configuration for circuit breaker behavior.""" 

147 

148 failure_threshold: int = DEFAULT_FAILURE_THRESHOLD 

149 recovery_timeout: float = DEFAULT_RECOVERY_TIMEOUT 

150 success_threshold: int = DEFAULT_SUCCESS_THRESHOLD 

151 expected_exception: type[Exception] = Exception 

152 

153 

154class CircuitOpenError(AugintError): 

155 """Raised when circuit breaker is open and rejecting requests.""" 

156 

157 def __init__(self, service: str, recovery_time: float) -> None: 

158 """Initialize CircuitOpenError.""" 

159 super().__init__( 

160 f"Circuit breaker is OPEN for {service}", 

161 code=ErrorCode.NETWORK_ERROR, 

162 details={ 

163 "service": service, 

164 "recovery_in": f"{recovery_time:.1f}s", 

165 "state": CircuitState.OPEN.value, 

166 }, 

167 ) 

168 

169 

170class CircuitBreaker: 

171 """Circuit breaker implementation to prevent cascading failures.""" 

172 

173 def __init__(self, name: str, config: CircuitBreakerConfig): 

174 """Initialize circuit breaker.""" 

175 self.name = name 

176 self.config = config 

177 self.state = CircuitState.CLOSED 

178 self.failure_count = 0 

179 self.success_count = 0 

180 self.last_failure_time: Optional[float] = None 

181 self._lock = Lock() 

182 

183 def call(self, func: Callable[..., Any], *args: Any, **kwargs: Any) -> Any: 

184 """Execute function through circuit breaker.""" 

185 with self._lock: 

186 # Check if we should transition from OPEN to HALF_OPEN 

187 if self.state == CircuitState.OPEN: 

188 if ( 

189 self.last_failure_time 

190 and time.time() - self.last_failure_time >= self.config.recovery_timeout 

191 ): 

192 self.state = CircuitState.HALF_OPEN 

193 self.success_count = 0 

194 else: 

195 recovery_time = self.config.recovery_timeout - ( 

196 time.time() - (self.last_failure_time or 0) 

197 ) 

198 raise CircuitOpenError(self.name, recovery_time) 

199 

200 try: 

201 result = func(*args, **kwargs) 

202 except self.config.expected_exception: 

203 self._on_failure() 

204 raise 

205 else: 

206 self._on_success() 

207 return result 

208 

209 def _on_success(self) -> None: 

210 """Handle successful call.""" 

211 with self._lock: 

212 self.failure_count = 0 

213 

214 if self.state == CircuitState.HALF_OPEN: 

215 self.success_count += 1 

216 if self.success_count >= self.config.success_threshold: 

217 self.state = CircuitState.CLOSED 

218 

219 def _on_failure(self) -> None: 

220 """Handle failed call.""" 

221 with self._lock: 

222 self.failure_count += 1 

223 self.last_failure_time = time.time() 

224 

225 if self.state == CircuitState.HALF_OPEN or ( 

226 self.state == CircuitState.CLOSED 

227 and self.failure_count >= self.config.failure_threshold 

228 ): 

229 self.state = CircuitState.OPEN 

230 

231 def get_state(self) -> dict[str, Any]: 

232 """Get current circuit breaker state.""" 

233 with self._lock: 

234 return { 

235 "name": self.name, 

236 "state": self.state.value, 

237 "failure_count": self.failure_count, 

238 "success_count": self.success_count, 

239 "last_failure": self.last_failure_time, 

240 } 

241 

242 

243# Global registry for circuit breakers (for CLI inspection) 

244_circuit_breakers: dict[str, CircuitBreaker] = {} 

245_registry_lock = Lock() 

246 

247 

248def retry( 

249 max_attempts: int = DEFAULT_RETRY_ATTEMPTS, 

250 initial_delay: float = DEFAULT_INITIAL_DELAY, 

251 max_delay: float = DEFAULT_MAX_DELAY, 

252 exponential_base: float = DEFAULT_EXPONENTIAL_BASE, 

253 jitter: bool = True, 

254 retryable_exceptions: tuple[type[Exception], ...] = (Exception,), 

255) -> Callable[[Callable[..., Any]], Callable[..., Any]]: 

256 """Decorator for retrying functions with exponential backoff. 

257 

258 Args: 

259 max_attempts: Maximum number of attempts. 

260 initial_delay: Initial delay between retries in seconds. 

261 max_delay: Maximum delay between retries. 

262 exponential_base: Base for exponential backoff. 

263 jitter: Whether to add random jitter to delays. 

264 retryable_exceptions: Tuple of exceptions to retry on. 

265 

266 Returns: 

267 Decorated function that implements retry logic. 

268 

269 Example: 

270 >>> @retry(max_attempts=3, initial_delay=1.0) 

271 ... def flaky_operation(): 

272 ... # This might fail transiently 

273 ... return fetch_external_data() 

274 """ 

275 config = RetryConfig( 

276 max_attempts=max_attempts, 

277 initial_delay=initial_delay, 

278 max_delay=max_delay, 

279 exponential_base=exponential_base, 

280 jitter=jitter, 

281 retryable_exceptions=retryable_exceptions, 

282 ) 

283 

284 def decorator(func: Callable[..., Any]) -> Callable[..., Any]: 

285 @wraps(func) 

286 def wrapper(*args: Any, **kwargs: Any) -> Any: 

287 last_exception: Optional[Exception] = None 

288 

289 for attempt in range(config.max_attempts): 289 ↛ 311line 289 didn't jump to line 311 because the loop on line 289 didn't complete

290 try: 

291 return func(*args, **kwargs) 

292 except config.retryable_exceptions as e: 

293 last_exception = e 

294 

295 if attempt == config.max_attempts - 1: 

296 # Last attempt, re-raise 

297 raise 

298 

299 # Calculate delay with exponential backoff 

300 delay = min( 

301 config.initial_delay * (config.exponential_base**attempt), config.max_delay 

302 ) 

303 

304 # Add jitter to prevent thundering herd 

305 if config.jitter: 

306 delay *= JITTER_MIN_FACTOR + (random.random() * JITTER_MAX_FACTOR) # noqa: S311 - jitter for backoff 

307 

308 time.sleep(delay) 

309 

310 # Should never reach here, but just in case 

311 if last_exception: 

312 raise last_exception 

313 return None 

314 

315 return wrapper 

316 

317 return decorator 

318 

319 

320def circuit_breaker( 

321 failure_threshold: int = DEFAULT_FAILURE_THRESHOLD, 

322 recovery_timeout: float = DEFAULT_RECOVERY_TIMEOUT, 

323 success_threshold: int = DEFAULT_SUCCESS_THRESHOLD, 

324 expected_exception: type[Exception] = Exception, 

325 name: Optional[str] = None, 

326) -> Callable[[Callable[..., Any]], Callable[..., Any]]: 

327 """Decorator for circuit breaker pattern. 

328 

329 Args: 

330 failure_threshold: Number of failures before opening circuit. 

331 recovery_timeout: Time in seconds before attempting recovery. 

332 success_threshold: Successes needed to close circuit from half-open. 

333 expected_exception: Exception type that indicates failure. 

334 name: Optional name for the circuit breaker. 

335 

336 Returns: 

337 Decorated function with circuit breaker protection. 

338 

339 Example: 

340 >>> @circuit_breaker(failure_threshold=5, recovery_timeout=60) 

341 ... def call_external_service(): 

342 ... return requests.get("https://api.example.com/data") 

343 """ 

344 config = CircuitBreakerConfig( 

345 failure_threshold=failure_threshold, 

346 recovery_timeout=recovery_timeout, 

347 success_threshold=success_threshold, 

348 expected_exception=expected_exception, 

349 ) 

350 

351 def decorator(func: Callable[..., Any]) -> Callable[..., Any]: 

352 # Use function name as circuit breaker name if not provided 

353 cb_name = name or f"{func.__module__}.{func.__name__}" 

354 

355 # Get or create circuit breaker 

356 with _registry_lock: 

357 if cb_name not in _circuit_breakers: 357 ↛ 359line 357 didn't jump to line 359 because the condition on line 357 was always true

358 _circuit_breakers[cb_name] = CircuitBreaker(cb_name, config) 

359 cb = _circuit_breakers[cb_name] 

360 

361 @wraps(func) 

362 def wrapper(*args: Any, **kwargs: Any) -> Any: 

363 return cb.call(func, *args, **kwargs) 

364 

365 return wrapper 

366 

367 return decorator 

368 

369 

370def get_circuit_breakers() -> dict[str, dict[str, Any]]: 

371 """Get status of all circuit breakers. 

372 

373 Returns: 

374 Dictionary mapping circuit breaker names to their states. 

375 """ 

376 with _registry_lock: 

377 return {name: cb.get_state() for name, cb in _circuit_breakers.items()} 

378 

379 

380def reset_circuit_breaker(name: str) -> bool: 

381 """Reset a specific circuit breaker. 

382 

383 Args: 

384 name: Name of the circuit breaker to reset. 

385 

386 Returns: 

387 True if reset was successful, False if breaker not found. 

388 """ 

389 with _registry_lock: 

390 if name in _circuit_breakers: 

391 cb = _circuit_breakers[name] 

392 with cb._lock: 

393 cb.state = CircuitState.CLOSED 

394 cb.failure_count = 0 

395 cb.success_count = 0 

396 cb.last_failure_time = None 

397 return True 

398 return False