Coverage for src / augint_library / protocols.py: 100%

34 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-30 20:22 +0000

1"""Protocol definitions for augint-library. 

2 

3This module demonstrates how to use Python's Protocol classes to define 

4flexible, type-safe interfaces. Protocols enable: 

5- Duck typing with static type checking 

6- Dependency injection patterns 

7- Testable, loosely coupled code 

8- Clear contracts between components 

9 

10Common Use Cases: 

11 1. Defining plugin interfaces 

12 2. Creating mockable dependencies 

13 3. Supporting multiple implementations 

14 4. Building extensible libraries 

15 5. Enabling dependency injection 

16 

17Examples: 

18 Basic protocol usage: 

19 >>> from augint_library.protocols import DataProcessor 

20 >>> 

21 >>> class CSVProcessor: 

22 ... def process(self, data: str, timeout: Optional[int] = None) -> dict: 

23 ... # Implementation details 

24 ... return {"rows": data.count("\\n")} 

25 ... 

26 >>> # Type checker knows CSVProcessor implements DataProcessor 

27 >>> processor: DataProcessor = CSVProcessor() 

28 

29 Using protocols for dependency injection: 

30 >>> from augint_library.protocols import CacheProvider 

31 >>> 

32 >>> class UserService: 

33 ... def __init__(self, cache: CacheProvider): 

34 ... self.cache = cache 

35 ... 

36 ... def get_user(self, user_id: str): 

37 ... cached = self.cache.get(f"user:{user_id}") 

38 ... if cached: 

39 ... return cached 

40 ... # Fetch from database... 

41 

42 Testing with protocol mocks: 

43 >>> from unittest.mock import Mock 

44 >>> 

45 >>> # Create a mock that satisfies the protocol 

46 >>> mock_cache = Mock(spec=CacheProvider) 

47 >>> mock_cache.get.return_value = {"id": "123", "name": "Alice"} 

48 >>> 

49 >>> service = UserService(mock_cache) 

50 >>> user = service.get_user("123") 

51 

52 Runtime protocol checking: 

53 >>> from augint_library.protocols import EventHandler 

54 >>> 

55 >>> @runtime_checkable 

56 >>> class EventHandler(Protocol): 

57 ... def handle(self, event: dict) -> bool: ... 

58 >>> 

59 >>> def process_event(handler: Any) -> None: 

60 ... if not isinstance(handler, EventHandler): 

61 ... raise TypeError("Handler must implement EventHandler protocol") 

62 ... handler.handle({"type": "user_login"}) 

63 

64Best Practices: 

65 1. Keep protocols focused and minimal 

66 2. Use Optional for optional parameters 

67 3. Document expected behavior in docstrings 

68 4. Consider using @runtime_checkable for dynamic validation 

69 5. Prefer protocols over abstract base classes 

70 

71Advanced Patterns: 

72 Protocol inheritance: 

73 >>> class BasicProcessor(Protocol): 

74 ... def process(self, data: Any) -> Any: ... 

75 >>> 

76 >>> class AdvancedProcessor(BasicProcessor, Protocol): 

77 ... def process_batch(self, items: list[Any]) -> list[Any]: ... 

78 ... def get_stats(self) -> dict[str, Any]: ... 

79 

80 Generic protocols: 

81 >>> from typing import TypeVar, Generic 

82 >>> 

83 >>> T = TypeVar('T') 

84 >>> 

85 >>> class Repository(Protocol, Generic[T]): 

86 ... def get(self, id: str) -> Optional[T]: ... 

87 ... def save(self, entity: T) -> None: ... 

88 ... def delete(self, id: str) -> bool: ... 

89 

90Note: 

91 Protocols are a powerful feature for creating flexible, maintainable code. 

92 They work especially well with type checkers like mypy to catch errors 

93 early while maintaining Python's dynamic nature. 

94""" 

95 

96from typing import Any, Optional, Protocol, runtime_checkable 

97 

98from .constants import ( 

99 DEFAULT_BATCH_CHUNK_SIZE, 

100 DEFAULT_DATA_PROCESSOR_TIMEOUT, 

101 DEFAULT_RETRY_ATTEMPTS, 

102) 

103 

104__all__ = [ 

105 "AdvancedProcessor", 

106 "CacheProvider", 

107 "ConfigurableClient", 

108 "DataProcessor", 

109 "EventHandler", 

110 "ProcessingResult", 

111] 

112 

113 

114@runtime_checkable 

115class ProcessingResult(Protocol): 

116 """Protocol for data processing operation results. 

117 

118 This protocol defines the standard interface for results returned 

119 by data processing operations, providing consistent success/failure 

120 information and processed data access. 

121 

122 Example: 

123 >>> result = some_processor.process(data) 

124 >>> if result.success: 

125 ... print(f"Processed data: {result.data}") 

126 ... else: 

127 ... print(f"Error: {result.error}") 

128 """ 

129 

130 @property 

131 def success(self) -> bool: 

132 """Whether the processing operation succeeded.""" 

133 ... 

134 

135 @property 

136 def data(self) -> Any: 

137 """The processed data (None if processing failed).""" 

138 ... 

139 

140 @property 

141 def error(self) -> Optional[str]: 

142 """Error message if processing failed (None if successful).""" 

143 ... 

144 

145 @property 

146 def metadata(self) -> dict[str, Any]: 

147 """Additional metadata about the processing operation. 

148 

149 Common metadata keys: 

150 - 'duration': Processing time in seconds 

151 - 'timestamp': When processing completed 

152 - 'retries': Number of retry attempts made 

153 """ 

154 ... 

155 

156 

157@runtime_checkable 

158class DataProcessor(Protocol): 

159 """Protocol for data processing operations. 

160 

161 This protocol defines the interface for processing various data types 

162 with configurable options, error handling, and batch operations. 

163 

164 Implementations should handle various data types gracefully and 

165 provide consistent error reporting through ProcessingResult objects. 

166 

167 Example: 

168 >>> processor = ConcreteProcessor() 

169 >>> result = processor.process(data, timeout=30, retries=2) 

170 >>> if result.success: 

171 ... print(f"Success: {len(result.data)} items processed") 

172 """ 

173 

174 def process( 

175 self, 

176 data: Any, 

177 *, 

178 timeout: int = DEFAULT_DATA_PROCESSOR_TIMEOUT, 

179 retries: int = DEFAULT_RETRY_ATTEMPTS, 

180 validate: bool = True, 

181 ) -> ProcessingResult: 

182 """Process the given data with specified options. 

183 

184 Args: 

185 data: The input data to process (any type). 

186 timeout: Maximum processing time in seconds. 

187 retries: Number of retry attempts on failure. 

188 validate: Whether to validate input data before processing. 

189 

190 Returns: 

191 ProcessingResult with success status and processed data. 

192 

193 Raises: 

194 ValidationError: If validate=True and data is invalid. 

195 TimeoutError: If processing exceeds timeout. 

196 ProcessingError: If processing fails after all retries. 

197 """ 

198 ... 

199 

200 def batch_process( 

201 self, 

202 data_list: list[Any], 

203 *, 

204 parallel: bool = True, 

205 chunk_size: int = DEFAULT_BATCH_CHUNK_SIZE, 

206 ) -> list[ProcessingResult]: 

207 """Process multiple data items efficiently. 

208 

209 Args: 

210 data_list: List of data items to process. 

211 parallel: Whether to process items in parallel. 

212 chunk_size: Number of items per processing chunk. 

213 

214 Returns: 

215 List of ProcessingResult objects, one per input item. 

216 

217 Note: 

218 Results are returned in the same order as input data, 

219 even when parallel=True. 

220 """ 

221 ... 

222 

223 

224@runtime_checkable 

225class ConfigurableClient(Protocol): 

226 """Protocol for configurable API clients. 

227 

228 This protocol defines the interface for HTTP clients that can be 

229 configured with authentication, timeouts, and retry behavior. 

230 

231 Example: 

232 >>> client = SomeAPIClient( 

233 ... api_key="secret", 

234 ... base_url="https://api.example.com", 

235 ... timeout=30 

236 ... ) 

237 >>> response = client.get("/users", limit=10) 

238 >>> users = response["data"] 

239 """ 

240 

241 def get(self, endpoint: str, **params: Any) -> dict[str, Any]: 

242 """Make GET request to specified endpoint. 

243 

244 Args: 

245 endpoint: API endpoint path (without base_url). 

246 **params: Query parameters for the request. 

247 

248 Returns: 

249 JSON response as dictionary. 

250 

251 Raises: 

252 APIError: If request fails after retries. 

253 AuthenticationError: If API key is invalid. 

254 TimeoutError: If request exceeds timeout. 

255 """ 

256 ... 

257 

258 def post( 

259 self, endpoint: str, data: Optional[dict[str, Any]] = None, **params: Any 

260 ) -> dict[str, Any]: 

261 """Make POST request to specified endpoint. 

262 

263 Args: 

264 endpoint: API endpoint path (without base_url). 

265 data: JSON data for request body. 

266 **params: Query parameters for the request. 

267 

268 Returns: 

269 JSON response as dictionary. 

270 

271 Raises: 

272 APIError: If request fails after retries. 

273 AuthenticationError: If API key is invalid. 

274 TimeoutError: If request exceeds timeout. 

275 """ 

276 ... 

277 

278 

279@runtime_checkable 

280class EventHandler(Protocol): 

281 """Protocol for handling application events. 

282 

283 This protocol defines the interface for components that process 

284 application events with different priority levels and types. 

285 

286 Example: 

287 >>> handler = MyEventHandler() 

288 >>> if handler.can_handle("user_created"): 

289 ... success = handler.handle_event( 

290 ... "user_created", 

291 ... {"user_id": 123, "email": "user@example.com"}, 

292 ... priority="high" 

293 ... ) 

294 """ 

295 

296 def handle_event( 

297 self, event_type: str, event_data: dict[str, Any], *, priority: str = "normal" 

298 ) -> bool: 

299 """Handle a single event. 

300 

301 Args: 

302 event_type: Type identifier for the event. 

303 event_data: Event payload data. 

304 priority: Event priority level ("low", "normal", "high"). 

305 

306 Returns: 

307 True if event was handled successfully, False otherwise. 

308 

309 Note: 

310 Implementations should not raise exceptions for handling 

311 failures - return False instead and log errors internally. 

312 """ 

313 ... 

314 

315 def can_handle(self, event_type: str) -> bool: 

316 """Check if this handler can process the given event type. 

317 

318 Args: 

319 event_type: Type identifier to check. 

320 

321 Returns: 

322 True if this handler supports the event type. 

323 """ 

324 ... 

325 

326 

327@runtime_checkable 

328class CacheProvider(Protocol): 

329 """Protocol for caching implementations. 

330 

331 This protocol defines a generic interface for different caching 

332 backends (memory, Redis, file system, etc.) with TTL support. 

333 

334 Example: 

335 >>> cache = SomeCacheProvider() 

336 >>> cache.set("user:123", {"name": "Alice"}, ttl=300) 

337 >>> user_data = cache.get("user:123") 

338 >>> if user_data is not None: 

339 ... print(f"Found user: {user_data['name']}") 

340 """ 

341 

342 def get(self, key: str) -> Optional[Any]: 

343 """Retrieve value from cache. 

344 

345 Args: 

346 key: Cache key to retrieve. 

347 

348 Returns: 

349 Cached value or None if key not found/expired. 

350 """ 

351 ... 

352 

353 def set(self, key: str, value: Any, *, ttl: Optional[int] = None) -> bool: 

354 """Store value in cache. 

355 

356 Args: 

357 key: Cache key for storage. 

358 value: Value to cache (must be serializable). 

359 ttl: Time-to-live in seconds (None for no expiration). 

360 

361 Returns: 

362 True if value was cached successfully. 

363 """ 

364 ... 

365 

366 def delete(self, key: str) -> bool: 

367 """Remove key from cache. 

368 

369 Args: 

370 key: Cache key to remove. 

371 

372 Returns: 

373 True if key was removed or didn't exist. 

374 """ 

375 ... 

376 

377 def clear(self) -> bool: 

378 """Clear all cache entries. 

379 

380 Returns: 

381 True if cache was cleared successfully. 

382 

383 Warning: 

384 This operation may be expensive and should be used carefully 

385 in production environments. 

386 """ 

387 ... 

388 

389 

390# Protocol composition example 

391@runtime_checkable 

392class AdvancedProcessor(DataProcessor, EventHandler, Protocol): 

393 """Protocol combining data processing with event handling. 

394 

395 This demonstrates how to compose multiple protocols into a single 

396 interface specification for more complex components. 

397 

398 Implementations must satisfy both DataProcessor and EventHandler 

399 protocols, plus any additional methods defined here. 

400 """ 

401 

402 def process_with_events( 

403 self, 

404 data: Any, 

405 on_progress: Optional[Any] = None, # EventCallback would be defined elsewhere 

406 ) -> ProcessingResult: 

407 """Process data while emitting progress events. 

408 

409 This method combines data processing with event emission, 

410 allowing callers to monitor processing progress in real-time. 

411 

412 Args: 

413 data: Data to process. 

414 on_progress: Optional callback for progress updates. 

415 

416 Returns: 

417 ProcessingResult with final processing status. 

418 

419 Note: 

420 Progress events should be emitted at regular intervals 

421 during processing to provide meaningful feedback. 

422 """ 

423 ...