Coverage for src / augint_library / protocols.py: 100%
34 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-30 20:22 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-30 20:22 +0000
1"""Protocol definitions for augint-library.
3This module demonstrates how to use Python's Protocol classes to define
4flexible, type-safe interfaces. Protocols enable:
5- Duck typing with static type checking
6- Dependency injection patterns
7- Testable, loosely coupled code
8- Clear contracts between components
10Common Use Cases:
11 1. Defining plugin interfaces
12 2. Creating mockable dependencies
13 3. Supporting multiple implementations
14 4. Building extensible libraries
15 5. Enabling dependency injection
17Examples:
18 Basic protocol usage:
19 >>> from augint_library.protocols import DataProcessor
20 >>>
21 >>> class CSVProcessor:
22 ... def process(self, data: str, timeout: Optional[int] = None) -> dict:
23 ... # Implementation details
24 ... return {"rows": data.count("\\n")}
25 ...
26 >>> # Type checker knows CSVProcessor implements DataProcessor
27 >>> processor: DataProcessor = CSVProcessor()
29 Using protocols for dependency injection:
30 >>> from augint_library.protocols import CacheProvider
31 >>>
32 >>> class UserService:
33 ... def __init__(self, cache: CacheProvider):
34 ... self.cache = cache
35 ...
36 ... def get_user(self, user_id: str):
37 ... cached = self.cache.get(f"user:{user_id}")
38 ... if cached:
39 ... return cached
40 ... # Fetch from database...
42 Testing with protocol mocks:
43 >>> from unittest.mock import Mock
44 >>>
45 >>> # Create a mock that satisfies the protocol
46 >>> mock_cache = Mock(spec=CacheProvider)
47 >>> mock_cache.get.return_value = {"id": "123", "name": "Alice"}
48 >>>
49 >>> service = UserService(mock_cache)
50 >>> user = service.get_user("123")
52 Runtime protocol checking:
53 >>> from augint_library.protocols import EventHandler
54 >>>
55 >>> @runtime_checkable
56 >>> class EventHandler(Protocol):
57 ... def handle(self, event: dict) -> bool: ...
58 >>>
59 >>> def process_event(handler: Any) -> None:
60 ... if not isinstance(handler, EventHandler):
61 ... raise TypeError("Handler must implement EventHandler protocol")
62 ... handler.handle({"type": "user_login"})
64Best Practices:
65 1. Keep protocols focused and minimal
66 2. Use Optional for optional parameters
67 3. Document expected behavior in docstrings
68 4. Consider using @runtime_checkable for dynamic validation
69 5. Prefer protocols over abstract base classes
71Advanced Patterns:
72 Protocol inheritance:
73 >>> class BasicProcessor(Protocol):
74 ... def process(self, data: Any) -> Any: ...
75 >>>
76 >>> class AdvancedProcessor(BasicProcessor, Protocol):
77 ... def process_batch(self, items: list[Any]) -> list[Any]: ...
78 ... def get_stats(self) -> dict[str, Any]: ...
80 Generic protocols:
81 >>> from typing import TypeVar, Generic
82 >>>
83 >>> T = TypeVar('T')
84 >>>
85 >>> class Repository(Protocol, Generic[T]):
86 ... def get(self, id: str) -> Optional[T]: ...
87 ... def save(self, entity: T) -> None: ...
88 ... def delete(self, id: str) -> bool: ...
90Note:
91 Protocols are a powerful feature for creating flexible, maintainable code.
92 They work especially well with type checkers like mypy to catch errors
93 early while maintaining Python's dynamic nature.
94"""
96from typing import Any, Optional, Protocol, runtime_checkable
98from .constants import (
99 DEFAULT_BATCH_CHUNK_SIZE,
100 DEFAULT_DATA_PROCESSOR_TIMEOUT,
101 DEFAULT_RETRY_ATTEMPTS,
102)
104__all__ = [
105 "AdvancedProcessor",
106 "CacheProvider",
107 "ConfigurableClient",
108 "DataProcessor",
109 "EventHandler",
110 "ProcessingResult",
111]
114@runtime_checkable
115class ProcessingResult(Protocol):
116 """Protocol for data processing operation results.
118 This protocol defines the standard interface for results returned
119 by data processing operations, providing consistent success/failure
120 information and processed data access.
122 Example:
123 >>> result = some_processor.process(data)
124 >>> if result.success:
125 ... print(f"Processed data: {result.data}")
126 ... else:
127 ... print(f"Error: {result.error}")
128 """
130 @property
131 def success(self) -> bool:
132 """Whether the processing operation succeeded."""
133 ...
135 @property
136 def data(self) -> Any:
137 """The processed data (None if processing failed)."""
138 ...
140 @property
141 def error(self) -> Optional[str]:
142 """Error message if processing failed (None if successful)."""
143 ...
145 @property
146 def metadata(self) -> dict[str, Any]:
147 """Additional metadata about the processing operation.
149 Common metadata keys:
150 - 'duration': Processing time in seconds
151 - 'timestamp': When processing completed
152 - 'retries': Number of retry attempts made
153 """
154 ...
157@runtime_checkable
158class DataProcessor(Protocol):
159 """Protocol for data processing operations.
161 This protocol defines the interface for processing various data types
162 with configurable options, error handling, and batch operations.
164 Implementations should handle various data types gracefully and
165 provide consistent error reporting through ProcessingResult objects.
167 Example:
168 >>> processor = ConcreteProcessor()
169 >>> result = processor.process(data, timeout=30, retries=2)
170 >>> if result.success:
171 ... print(f"Success: {len(result.data)} items processed")
172 """
174 def process(
175 self,
176 data: Any,
177 *,
178 timeout: int = DEFAULT_DATA_PROCESSOR_TIMEOUT,
179 retries: int = DEFAULT_RETRY_ATTEMPTS,
180 validate: bool = True,
181 ) -> ProcessingResult:
182 """Process the given data with specified options.
184 Args:
185 data: The input data to process (any type).
186 timeout: Maximum processing time in seconds.
187 retries: Number of retry attempts on failure.
188 validate: Whether to validate input data before processing.
190 Returns:
191 ProcessingResult with success status and processed data.
193 Raises:
194 ValidationError: If validate=True and data is invalid.
195 TimeoutError: If processing exceeds timeout.
196 ProcessingError: If processing fails after all retries.
197 """
198 ...
200 def batch_process(
201 self,
202 data_list: list[Any],
203 *,
204 parallel: bool = True,
205 chunk_size: int = DEFAULT_BATCH_CHUNK_SIZE,
206 ) -> list[ProcessingResult]:
207 """Process multiple data items efficiently.
209 Args:
210 data_list: List of data items to process.
211 parallel: Whether to process items in parallel.
212 chunk_size: Number of items per processing chunk.
214 Returns:
215 List of ProcessingResult objects, one per input item.
217 Note:
218 Results are returned in the same order as input data,
219 even when parallel=True.
220 """
221 ...
224@runtime_checkable
225class ConfigurableClient(Protocol):
226 """Protocol for configurable API clients.
228 This protocol defines the interface for HTTP clients that can be
229 configured with authentication, timeouts, and retry behavior.
231 Example:
232 >>> client = SomeAPIClient(
233 ... api_key="secret",
234 ... base_url="https://api.example.com",
235 ... timeout=30
236 ... )
237 >>> response = client.get("/users", limit=10)
238 >>> users = response["data"]
239 """
241 def get(self, endpoint: str, **params: Any) -> dict[str, Any]:
242 """Make GET request to specified endpoint.
244 Args:
245 endpoint: API endpoint path (without base_url).
246 **params: Query parameters for the request.
248 Returns:
249 JSON response as dictionary.
251 Raises:
252 APIError: If request fails after retries.
253 AuthenticationError: If API key is invalid.
254 TimeoutError: If request exceeds timeout.
255 """
256 ...
258 def post(
259 self, endpoint: str, data: Optional[dict[str, Any]] = None, **params: Any
260 ) -> dict[str, Any]:
261 """Make POST request to specified endpoint.
263 Args:
264 endpoint: API endpoint path (without base_url).
265 data: JSON data for request body.
266 **params: Query parameters for the request.
268 Returns:
269 JSON response as dictionary.
271 Raises:
272 APIError: If request fails after retries.
273 AuthenticationError: If API key is invalid.
274 TimeoutError: If request exceeds timeout.
275 """
276 ...
279@runtime_checkable
280class EventHandler(Protocol):
281 """Protocol for handling application events.
283 This protocol defines the interface for components that process
284 application events with different priority levels and types.
286 Example:
287 >>> handler = MyEventHandler()
288 >>> if handler.can_handle("user_created"):
289 ... success = handler.handle_event(
290 ... "user_created",
291 ... {"user_id": 123, "email": "user@example.com"},
292 ... priority="high"
293 ... )
294 """
296 def handle_event(
297 self, event_type: str, event_data: dict[str, Any], *, priority: str = "normal"
298 ) -> bool:
299 """Handle a single event.
301 Args:
302 event_type: Type identifier for the event.
303 event_data: Event payload data.
304 priority: Event priority level ("low", "normal", "high").
306 Returns:
307 True if event was handled successfully, False otherwise.
309 Note:
310 Implementations should not raise exceptions for handling
311 failures - return False instead and log errors internally.
312 """
313 ...
315 def can_handle(self, event_type: str) -> bool:
316 """Check if this handler can process the given event type.
318 Args:
319 event_type: Type identifier to check.
321 Returns:
322 True if this handler supports the event type.
323 """
324 ...
327@runtime_checkable
328class CacheProvider(Protocol):
329 """Protocol for caching implementations.
331 This protocol defines a generic interface for different caching
332 backends (memory, Redis, file system, etc.) with TTL support.
334 Example:
335 >>> cache = SomeCacheProvider()
336 >>> cache.set("user:123", {"name": "Alice"}, ttl=300)
337 >>> user_data = cache.get("user:123")
338 >>> if user_data is not None:
339 ... print(f"Found user: {user_data['name']}")
340 """
342 def get(self, key: str) -> Optional[Any]:
343 """Retrieve value from cache.
345 Args:
346 key: Cache key to retrieve.
348 Returns:
349 Cached value or None if key not found/expired.
350 """
351 ...
353 def set(self, key: str, value: Any, *, ttl: Optional[int] = None) -> bool:
354 """Store value in cache.
356 Args:
357 key: Cache key for storage.
358 value: Value to cache (must be serializable).
359 ttl: Time-to-live in seconds (None for no expiration).
361 Returns:
362 True if value was cached successfully.
363 """
364 ...
366 def delete(self, key: str) -> bool:
367 """Remove key from cache.
369 Args:
370 key: Cache key to remove.
372 Returns:
373 True if key was removed or didn't exist.
374 """
375 ...
377 def clear(self) -> bool:
378 """Clear all cache entries.
380 Returns:
381 True if cache was cleared successfully.
383 Warning:
384 This operation may be expensive and should be used carefully
385 in production environments.
386 """
387 ...
390# Protocol composition example
391@runtime_checkable
392class AdvancedProcessor(DataProcessor, EventHandler, Protocol):
393 """Protocol combining data processing with event handling.
395 This demonstrates how to compose multiple protocols into a single
396 interface specification for more complex components.
398 Implementations must satisfy both DataProcessor and EventHandler
399 protocols, plus any additional methods defined here.
400 """
402 def process_with_events(
403 self,
404 data: Any,
405 on_progress: Optional[Any] = None, # EventCallback would be defined elsewhere
406 ) -> ProcessingResult:
407 """Process data while emitting progress events.
409 This method combines data processing with event emission,
410 allowing callers to monitor processing progress in real-time.
412 Args:
413 data: Data to process.
414 on_progress: Optional callback for progress updates.
416 Returns:
417 ProcessingResult with final processing status.
419 Note:
420 Progress events should be emitted at regular intervals
421 during processing to provide meaningful feedback.
422 """
423 ...