ai_lls_lib
AI LLS Library - Core business logic for Landline Scrubber.
This library provides phone verification and DNC checking capabilities.
Version 2.0.0 establishes clean semantic versioning baseline. All version management now controlled by Python Semantic Release.
Dependencies optimized for Lambda deployment (removed unused pandas/pyarrow).
1""" 2AI LLS Library - Core business logic for Landline Scrubber. 3 4This library provides phone verification and DNC checking capabilities. 5 6Version 2.0.0 establishes clean semantic versioning baseline. 7All version management now controlled by Python Semantic Release. 8 9Dependencies optimized for Lambda deployment (removed unused pandas/pyarrow). 10""" 11 12from ai_lls_lib.apikeys import ( 13 KeyNotFoundError, 14 LimitExceededError, 15 ManagedApiKeyService, 16 RevokedKeyError, 17) 18from ai_lls_lib.common import DecimalEncoder, extract_area_code 19from ai_lls_lib.core.cache import DynamoDBCache 20from ai_lls_lib.core.models import ( 21 BulkJob, 22 BulkJobStatus, 23 JobStatus, 24 LineType, 25 PhoneVerification, 26 VerificationSource, 27) 28from ai_lls_lib.core.processor import BulkProcessor 29from ai_lls_lib.core.verifier import PhoneVerifier 30from ai_lls_lib.files import FileService 31from ai_lls_lib.key_management import ( 32 compute_key_hash, 33 generate_key_id, 34 generate_managed_key, 35 validate_expiration_days, 36) 37from ai_lls_lib.providers.exceptions import ProviderError 38 39__version__ = "3.13.0" 40 41__all__ = [ 42 "PhoneVerification", 43 "BulkJob", 44 "BulkJobStatus", 45 "LineType", 46 "VerificationSource", 47 "JobStatus", 48 "PhoneVerifier", 49 "BulkProcessor", 50 "DynamoDBCache", 51 "compute_key_hash", 52 "generate_key_id", 53 "generate_managed_key", 54 "validate_expiration_days", 55 "DecimalEncoder", 56 "extract_area_code", 57 "ManagedApiKeyService", 58 "KeyNotFoundError", 59 "RevokedKeyError", 60 "LimitExceededError", 61 "ProviderError", 62 "FileService", 63]
38class PhoneVerification(BaseModel): 39 """Result of phone number verification""" 40 41 phone_number: str = Field(..., description="E.164 formatted phone number") 42 line_type: LineType = Field(..., description="Type of phone line") 43 dnc: bool = Field(..., description="Whether number is on DNC list") 44 cached: bool = Field(..., description="Whether result came from cache") 45 verified_at: datetime = Field(..., description="When verification occurred") 46 source: VerificationSource = Field(..., description="Source of verification data") 47 48 class Config: 49 json_encoders = {datetime: lambda v: v.isoformat()}
Result of phone number verification
52class BulkJob(BaseModel): 53 """Bulk processing job metadata""" 54 55 job_id: str = Field(..., description="Unique job identifier") 56 status: JobStatus = Field(..., description="Current job status")
Bulk processing job metadata
59class BulkJobStatus(BulkJob): 60 """Extended bulk job status with progress info""" 61 62 total_rows: int | None = Field(None, description="Total rows to process") 63 processed_rows: int | None = Field(None, description="Rows processed so far") 64 result_url: str | None = Field(None, description="S3 URL of results") 65 created_at: datetime = Field(..., description="Job creation time") 66 completed_at: datetime | None = Field(None, description="Job completion time") 67 error: str | None = Field(None, description="Error message if failed")
Extended bulk job status with progress info
12class LineType(StrEnum): 13 """Phone line type enumeration""" 14 15 MOBILE = "mobile" 16 LANDLINE = "landline" 17 VOIP = "voip" 18 UNKNOWN = "unknown"
Phone line type enumeration
21class VerificationSource(StrEnum): 22 """Source of verification data""" 23 24 API = "api" 25 CACHE = "cache" 26 BULK_IMPORT = "bulk_import"
Source of verification data
29class JobStatus(StrEnum): 30 """Bulk job status enumeration""" 31 32 PENDING = "pending" 33 PROCESSING = "processing" 34 COMPLETED = "completed" 35 FAILED = "failed"
Bulk job status enumeration
18class PhoneVerifier: 19 """Verifies phone numbers for line type and DNC status""" 20 21 def __init__( 22 self, cache: DynamoDBCache | None = None, provider: VerificationProvider | None = None 23 ): 24 """ 25 Initialize phone verifier. 26 27 Args: 28 cache: Optional DynamoDB cache for storing results 29 provider: Verification provider (defaults to ExternalAPIProvider) 30 """ 31 self.cache = cache 32 self.provider = provider or ExternalAPIProvider() 33 logger.debug("PhoneVerifier initialized") 34 35 def normalize_phone(self, phone: str) -> str: 36 """Normalize phone to E.164 format""" 37 try: 38 # Parse with US as default country 39 parsed = phonenumbers.parse(phone, "US") 40 if not phonenumbers.is_valid_number(parsed): 41 raise ValueError(f"Invalid phone number: {phone}") 42 43 # Format as E.164 44 return phonenumbers.format_number(parsed, phonenumbers.PhoneNumberFormat.E164) 45 except Exception as e: 46 logger.error(f"Phone normalization failed: {str(e)}") 47 raise ValueError(f"Invalid phone format: {phone}") from e 48 49 def verify(self, phone: str) -> PhoneVerification: 50 """Verify phone number for line type and DNC status""" 51 normalized = self.normalize_phone(phone) 52 53 # Check cache first if available 54 if self.cache: 55 cached = self.cache.get(normalized) 56 if cached: 57 return cached 58 59 # Use provider to verify 60 line_type, dnc_status = self.provider.verify_phone(normalized) 61 62 result = PhoneVerification( 63 phone_number=normalized, 64 line_type=line_type, 65 dnc=dnc_status, 66 cached=False, 67 verified_at=datetime.now(UTC), 68 source=VerificationSource.API, 69 ) 70 71 # Store in cache if available 72 if self.cache: 73 try: 74 self.cache.set(normalized, result) 75 except Exception as e: 76 logger.warning(f"Failed to cache result: {e}") 77 # Continue without caching - don't fail the verification 78 79 return result 80 81 def _check_line_type(self, phone: str) -> LineType: 82 """ 83 Check line type (for backwards compatibility with CLI). 84 Delegates to provider. 85 """ 86 line_type, _ = self.provider.verify_phone(phone) 87 return line_type 88 89 def _check_dnc(self, phone: str) -> bool: 90 """ 91 Check DNC status (for backwards compatibility with CLI). 92 Delegates to provider. 93 """ 94 _, dnc_status = self.provider.verify_phone(phone) 95 return dnc_status
Verifies phone numbers for line type and DNC status
21 def __init__( 22 self, cache: DynamoDBCache | None = None, provider: VerificationProvider | None = None 23 ): 24 """ 25 Initialize phone verifier. 26 27 Args: 28 cache: Optional DynamoDB cache for storing results 29 provider: Verification provider (defaults to ExternalAPIProvider) 30 """ 31 self.cache = cache 32 self.provider = provider or ExternalAPIProvider() 33 logger.debug("PhoneVerifier initialized")
Initialize phone verifier.
Args: cache: Optional DynamoDB cache for storing results provider: Verification provider (defaults to ExternalAPIProvider)
35 def normalize_phone(self, phone: str) -> str: 36 """Normalize phone to E.164 format""" 37 try: 38 # Parse with US as default country 39 parsed = phonenumbers.parse(phone, "US") 40 if not phonenumbers.is_valid_number(parsed): 41 raise ValueError(f"Invalid phone number: {phone}") 42 43 # Format as E.164 44 return phonenumbers.format_number(parsed, phonenumbers.PhoneNumberFormat.E164) 45 except Exception as e: 46 logger.error(f"Phone normalization failed: {str(e)}") 47 raise ValueError(f"Invalid phone format: {phone}") from e
Normalize phone to E.164 format
49 def verify(self, phone: str) -> PhoneVerification: 50 """Verify phone number for line type and DNC status""" 51 normalized = self.normalize_phone(phone) 52 53 # Check cache first if available 54 if self.cache: 55 cached = self.cache.get(normalized) 56 if cached: 57 return cached 58 59 # Use provider to verify 60 line_type, dnc_status = self.provider.verify_phone(normalized) 61 62 result = PhoneVerification( 63 phone_number=normalized, 64 line_type=line_type, 65 dnc=dnc_status, 66 cached=False, 67 verified_at=datetime.now(UTC), 68 source=VerificationSource.API, 69 ) 70 71 # Store in cache if available 72 if self.cache: 73 try: 74 self.cache.set(normalized, result) 75 except Exception as e: 76 logger.warning(f"Failed to cache result: {e}") 77 # Continue without caching - don't fail the verification 78 79 return result
Verify phone number for line type and DNC status
18class BulkProcessor: 19 """Process CSV files for bulk phone verification""" 20 21 def __init__(self, verifier: PhoneVerifier): 22 self.verifier = verifier 23 24 def process_csv(self, csv_text: str, phone_column: str = "phone") -> list[PhoneVerification]: 25 """ 26 Process CSV text content. 27 Returns list of verification results. 28 """ 29 results = [] 30 31 try: 32 # Strip UTF-8 BOM if present (Excel on Windows adds this) 33 csv_text = csv_text.lstrip("\ufeff") 34 35 # Use StringIO to parse CSV text 36 csv_file = StringIO(csv_text) 37 reader = csv.DictReader(csv_file) 38 39 # Find phone column (case-insensitive) 40 headers = reader.fieldnames or [] 41 phone_col = self._find_phone_column(headers, phone_column) 42 43 if not phone_col: 44 raise ValueError(f"Phone column '{phone_column}' not found in CSV") 45 46 logger.info(f"Starting CSV processing using phone column '{phone_col}'") 47 48 for row_num, row in enumerate(reader, start=2): # Start at 2 (header is 1) 49 try: 50 phone = row.get(phone_col, "").strip() 51 if not phone: 52 logger.warning(f"Empty phone at row {row_num}") 53 continue 54 55 # Verify phone 56 result = self.verifier.verify(phone) 57 results.append(result) 58 59 # Log progress every 100 rows 60 if len(results) % 100 == 0: 61 logger.info(f"Processed {len(results)} phones (at row {row_num})") 62 63 except ValueError as e: 64 logger.warning(f"Invalid phone at row {row_num}: {str(e)}") 65 continue 66 except Exception as e: 67 logger.error(f"Error processing row {row_num}: {str(e)}") 68 continue 69 70 logger.info(f"Completed processing {len(results)} valid phones") 71 72 except Exception as e: 73 logger.error(f"CSV processing failed: {str(e)}") 74 raise 75 76 return results 77 78 def _find_phone_column(self, headers: list[str] | Sequence[str], preferred: str) -> str | None: 79 """Find phone column in headers (case-insensitive)""" 80 # First try exact match 81 for header in headers: 82 if header.lower() == preferred.lower(): 83 return header 84 85 # Common phone column names 86 phone_patterns = [ 87 "phone", 88 "phone_number", 89 "phonenumber", 90 "mobile", 91 "cell", 92 "telephone", 93 "tel", 94 "number", 95 "contact", 96 ] 97 98 for header in headers: 99 header_lower = header.lower() 100 for pattern in phone_patterns: 101 if pattern in header_lower: 102 logger.info(f"Using column '{header}' as phone column") 103 return header 104 105 return None 106 107 def generate_results_csv(self, original_csv_text: str, results: list[PhoneVerification]) -> str: 108 """ 109 Generate CSV with original data plus verification results. 110 Adds columns: line_type, dnc 111 Returns CSV text string. 112 """ 113 # Create lookup dict 114 results_map = {r.phone_number: r for r in results} 115 116 # Parse original CSV (strip UTF-8 BOM if present) 117 original_csv_text = original_csv_text.lstrip("\ufeff") 118 input_file = StringIO(original_csv_text) 119 reader = csv.DictReader(input_file) 120 headers = list(reader.fieldnames or []) 121 122 # Add new columns 123 output_headers = headers + ["line_type", "dnc"] 124 125 # Create output CSV in memory 126 output = StringIO() 127 writer = csv.DictWriter(output, fieldnames=output_headers) 128 writer.writeheader() 129 130 phone_col = self._find_phone_column(headers, "phone") 131 132 for row in reader: 133 phone = row.get(phone_col, "").strip() 134 135 # Try to normalize for lookup 136 try: 137 normalized = self.verifier.normalize_phone(phone) 138 if normalized in results_map: 139 result = results_map[normalized] 140 row["line_type"] = result.line_type.value 141 row["dnc"] = "true" if result.dnc else "false" 142 else: 143 row["line_type"] = "unknown" 144 row["dnc"] = "" 145 except Exception: 146 row["line_type"] = "invalid" 147 row["dnc"] = "" 148 149 writer.writerow(row) 150 151 # Return CSV text 152 return output.getvalue() 153 154 def process_csv_stream( 155 self, lines: Iterable[str], phone_column: str = "phone", batch_size: int = 100 156 ) -> Iterator[list[PhoneVerification]]: 157 """ 158 Process CSV lines as a stream, yielding batches of results. 159 Memory-efficient for large files. 160 161 Args: 162 lines: Iterator of CSV lines (including header) 163 phone_column: Column name containing phone numbers 164 batch_size: Number of results to accumulate before yielding 165 166 Yields: 167 Batches of PhoneVerification results 168 """ 169 lines_list = list(lines) # Need to iterate twice - once for headers, once for data 170 171 if not lines_list: 172 logger.error("Empty CSV stream") 173 return 174 175 # Parse header (strip UTF-8 BOM if present) 176 header_line = lines_list[0].lstrip("\ufeff") 177 reader = csv.DictReader(StringIO(header_line)) 178 headers = reader.fieldnames or [] 179 phone_col = self._find_phone_column(headers, phone_column) 180 181 if not phone_col: 182 raise ValueError(f"Phone column '{phone_column}' not found in CSV") 183 184 batch = [] 185 row_num = 2 # Start at 2 (header is 1) 186 total_processed = 0 187 188 # Process data lines 189 for line in lines_list[1:]: 190 if not line.strip(): 191 continue 192 193 try: 194 # Parse single line 195 row = next(csv.DictReader(StringIO(line), fieldnames=headers)) 196 phone = row.get(phone_col, "").strip() 197 198 if not phone: 199 logger.warning(f"Empty phone at row {row_num}") 200 row_num += 1 201 continue 202 203 # Verify phone 204 result = self.verifier.verify(phone) 205 batch.append(result) 206 total_processed += 1 207 208 # Yield batch when full 209 if len(batch) >= batch_size: 210 logger.info( 211 f"Processed batch of {len(batch)} phones (total: {total_processed}, at row {row_num})" 212 ) 213 yield batch 214 batch = [] 215 216 except ValueError as e: 217 logger.warning(f"Invalid phone at row {row_num}: {str(e)}") 218 except Exception as e: 219 logger.error(f"Error processing row {row_num}: {str(e)}") 220 finally: 221 row_num += 1 222 223 # Yield remaining results 224 if batch: 225 logger.info(f"Processed final batch of {len(batch)} phones (total: {total_processed})") 226 yield batch 227 228 logger.info(f"Stream processing completed. Total processed: {total_processed}") 229 230 def generate_results_csv_stream( 231 self, 232 original_lines: Iterable[str], 233 results_stream: Iterator[list[PhoneVerification]], 234 phone_column: str = "phone", 235 ) -> Iterator[str]: 236 """ 237 Generate CSV results as a stream, line by line. 238 Memory-efficient for large files. 239 240 Args: 241 original_lines: Iterator of original CSV lines 242 results_stream: Iterator of batched PhoneVerification results 243 phone_column: Column name containing phone numbers 244 245 Yields: 246 CSV lines with verification results added 247 """ 248 lines_iter = iter(original_lines) 249 250 # Read and yield modified header 251 try: 252 header_line = next(lines_iter).lstrip("\ufeff") 253 reader = csv.DictReader(StringIO(header_line)) 254 headers = list(reader.fieldnames or []) 255 256 # Add new columns 257 output_headers = headers + ["line_type", "dnc"] 258 yield ",".join(output_headers) + "\n" 259 260 phone_col = self._find_phone_column(headers, phone_column) 261 262 except StopIteration: 263 return 264 265 # Build results lookup from stream 266 results_map = {} 267 for batch in results_stream: 268 for result in batch: 269 results_map[result.phone_number] = result 270 271 # Reset lines iterator 272 lines_iter = iter(original_lines) 273 next(lines_iter) # Skip header 274 275 # Process and yield data lines 276 for line in lines_iter: 277 if not line.strip(): 278 continue 279 280 row = next(csv.DictReader(StringIO(line), fieldnames=headers)) 281 phone = row.get(phone_col, "").strip() 282 283 # Add verification results 284 try: 285 normalized = self.verifier.normalize_phone(phone) 286 if normalized in results_map: 287 result = results_map[normalized] 288 row["line_type"] = result.line_type.value 289 row["dnc"] = "true" if result.dnc else "false" 290 else: 291 row["line_type"] = "unknown" 292 row["dnc"] = "" 293 except Exception: 294 row["line_type"] = "invalid" 295 row["dnc"] = "" 296 297 # Write row 298 output = StringIO() 299 writer = csv.DictWriter(output, fieldnames=output_headers) 300 writer.writerow(row) 301 yield output.getvalue()
Process CSV files for bulk phone verification
24 def process_csv(self, csv_text: str, phone_column: str = "phone") -> list[PhoneVerification]: 25 """ 26 Process CSV text content. 27 Returns list of verification results. 28 """ 29 results = [] 30 31 try: 32 # Strip UTF-8 BOM if present (Excel on Windows adds this) 33 csv_text = csv_text.lstrip("\ufeff") 34 35 # Use StringIO to parse CSV text 36 csv_file = StringIO(csv_text) 37 reader = csv.DictReader(csv_file) 38 39 # Find phone column (case-insensitive) 40 headers = reader.fieldnames or [] 41 phone_col = self._find_phone_column(headers, phone_column) 42 43 if not phone_col: 44 raise ValueError(f"Phone column '{phone_column}' not found in CSV") 45 46 logger.info(f"Starting CSV processing using phone column '{phone_col}'") 47 48 for row_num, row in enumerate(reader, start=2): # Start at 2 (header is 1) 49 try: 50 phone = row.get(phone_col, "").strip() 51 if not phone: 52 logger.warning(f"Empty phone at row {row_num}") 53 continue 54 55 # Verify phone 56 result = self.verifier.verify(phone) 57 results.append(result) 58 59 # Log progress every 100 rows 60 if len(results) % 100 == 0: 61 logger.info(f"Processed {len(results)} phones (at row {row_num})") 62 63 except ValueError as e: 64 logger.warning(f"Invalid phone at row {row_num}: {str(e)}") 65 continue 66 except Exception as e: 67 logger.error(f"Error processing row {row_num}: {str(e)}") 68 continue 69 70 logger.info(f"Completed processing {len(results)} valid phones") 71 72 except Exception as e: 73 logger.error(f"CSV processing failed: {str(e)}") 74 raise 75 76 return results
Process CSV text content. Returns list of verification results.
107 def generate_results_csv(self, original_csv_text: str, results: list[PhoneVerification]) -> str: 108 """ 109 Generate CSV with original data plus verification results. 110 Adds columns: line_type, dnc 111 Returns CSV text string. 112 """ 113 # Create lookup dict 114 results_map = {r.phone_number: r for r in results} 115 116 # Parse original CSV (strip UTF-8 BOM if present) 117 original_csv_text = original_csv_text.lstrip("\ufeff") 118 input_file = StringIO(original_csv_text) 119 reader = csv.DictReader(input_file) 120 headers = list(reader.fieldnames or []) 121 122 # Add new columns 123 output_headers = headers + ["line_type", "dnc"] 124 125 # Create output CSV in memory 126 output = StringIO() 127 writer = csv.DictWriter(output, fieldnames=output_headers) 128 writer.writeheader() 129 130 phone_col = self._find_phone_column(headers, "phone") 131 132 for row in reader: 133 phone = row.get(phone_col, "").strip() 134 135 # Try to normalize for lookup 136 try: 137 normalized = self.verifier.normalize_phone(phone) 138 if normalized in results_map: 139 result = results_map[normalized] 140 row["line_type"] = result.line_type.value 141 row["dnc"] = "true" if result.dnc else "false" 142 else: 143 row["line_type"] = "unknown" 144 row["dnc"] = "" 145 except Exception: 146 row["line_type"] = "invalid" 147 row["dnc"] = "" 148 149 writer.writerow(row) 150 151 # Return CSV text 152 return output.getvalue()
Generate CSV with original data plus verification results. Adds columns: line_type, dnc Returns CSV text string.
154 def process_csv_stream( 155 self, lines: Iterable[str], phone_column: str = "phone", batch_size: int = 100 156 ) -> Iterator[list[PhoneVerification]]: 157 """ 158 Process CSV lines as a stream, yielding batches of results. 159 Memory-efficient for large files. 160 161 Args: 162 lines: Iterator of CSV lines (including header) 163 phone_column: Column name containing phone numbers 164 batch_size: Number of results to accumulate before yielding 165 166 Yields: 167 Batches of PhoneVerification results 168 """ 169 lines_list = list(lines) # Need to iterate twice - once for headers, once for data 170 171 if not lines_list: 172 logger.error("Empty CSV stream") 173 return 174 175 # Parse header (strip UTF-8 BOM if present) 176 header_line = lines_list[0].lstrip("\ufeff") 177 reader = csv.DictReader(StringIO(header_line)) 178 headers = reader.fieldnames or [] 179 phone_col = self._find_phone_column(headers, phone_column) 180 181 if not phone_col: 182 raise ValueError(f"Phone column '{phone_column}' not found in CSV") 183 184 batch = [] 185 row_num = 2 # Start at 2 (header is 1) 186 total_processed = 0 187 188 # Process data lines 189 for line in lines_list[1:]: 190 if not line.strip(): 191 continue 192 193 try: 194 # Parse single line 195 row = next(csv.DictReader(StringIO(line), fieldnames=headers)) 196 phone = row.get(phone_col, "").strip() 197 198 if not phone: 199 logger.warning(f"Empty phone at row {row_num}") 200 row_num += 1 201 continue 202 203 # Verify phone 204 result = self.verifier.verify(phone) 205 batch.append(result) 206 total_processed += 1 207 208 # Yield batch when full 209 if len(batch) >= batch_size: 210 logger.info( 211 f"Processed batch of {len(batch)} phones (total: {total_processed}, at row {row_num})" 212 ) 213 yield batch 214 batch = [] 215 216 except ValueError as e: 217 logger.warning(f"Invalid phone at row {row_num}: {str(e)}") 218 except Exception as e: 219 logger.error(f"Error processing row {row_num}: {str(e)}") 220 finally: 221 row_num += 1 222 223 # Yield remaining results 224 if batch: 225 logger.info(f"Processed final batch of {len(batch)} phones (total: {total_processed})") 226 yield batch 227 228 logger.info(f"Stream processing completed. Total processed: {total_processed}")
Process CSV lines as a stream, yielding batches of results. Memory-efficient for large files.
Args: lines: Iterator of CSV lines (including header) phone_column: Column name containing phone numbers batch_size: Number of results to accumulate before yielding
Yields: Batches of PhoneVerification results
230 def generate_results_csv_stream( 231 self, 232 original_lines: Iterable[str], 233 results_stream: Iterator[list[PhoneVerification]], 234 phone_column: str = "phone", 235 ) -> Iterator[str]: 236 """ 237 Generate CSV results as a stream, line by line. 238 Memory-efficient for large files. 239 240 Args: 241 original_lines: Iterator of original CSV lines 242 results_stream: Iterator of batched PhoneVerification results 243 phone_column: Column name containing phone numbers 244 245 Yields: 246 CSV lines with verification results added 247 """ 248 lines_iter = iter(original_lines) 249 250 # Read and yield modified header 251 try: 252 header_line = next(lines_iter).lstrip("\ufeff") 253 reader = csv.DictReader(StringIO(header_line)) 254 headers = list(reader.fieldnames or []) 255 256 # Add new columns 257 output_headers = headers + ["line_type", "dnc"] 258 yield ",".join(output_headers) + "\n" 259 260 phone_col = self._find_phone_column(headers, phone_column) 261 262 except StopIteration: 263 return 264 265 # Build results lookup from stream 266 results_map = {} 267 for batch in results_stream: 268 for result in batch: 269 results_map[result.phone_number] = result 270 271 # Reset lines iterator 272 lines_iter = iter(original_lines) 273 next(lines_iter) # Skip header 274 275 # Process and yield data lines 276 for line in lines_iter: 277 if not line.strip(): 278 continue 279 280 row = next(csv.DictReader(StringIO(line), fieldnames=headers)) 281 phone = row.get(phone_col, "").strip() 282 283 # Add verification results 284 try: 285 normalized = self.verifier.normalize_phone(phone) 286 if normalized in results_map: 287 result = results_map[normalized] 288 row["line_type"] = result.line_type.value 289 row["dnc"] = "true" if result.dnc else "false" 290 else: 291 row["line_type"] = "unknown" 292 row["dnc"] = "" 293 except Exception: 294 row["line_type"] = "invalid" 295 row["dnc"] = "" 296 297 # Write row 298 output = StringIO() 299 writer = csv.DictWriter(output, fieldnames=output_headers) 300 writer.writerow(row) 301 yield output.getvalue()
Generate CSV results as a stream, line by line. Memory-efficient for large files.
Args: original_lines: Iterator of original CSV lines results_stream: Iterator of batched PhoneVerification results phone_column: Column name containing phone numbers
Yields: CSV lines with verification results added
17class DynamoDBCache: 18 """Cache for phone verification results using DynamoDB with TTL""" 19 20 def __init__(self, table_name: str, ttl_days: int = 90): 21 self.table_name = table_name 22 self.ttl_days = ttl_days 23 self.dynamodb = boto3.resource("dynamodb") 24 self.table = self.dynamodb.Table(table_name) 25 26 def get(self, phone_number: str) -> PhoneVerification | None: 27 """Get cached verification result""" 28 try: 29 response = self.table.get_item(Key={"phone_number": phone_number}) 30 31 if "Item" not in response: 32 logger.info(f"Cache miss for {phone_number[:6]}***") 33 return None 34 35 item: dict[str, Any] = response["Item"] 36 logger.info(f"Cache hit for {phone_number[:6]}***") 37 38 return PhoneVerification( 39 phone_number=str(item["phone_number"]), 40 line_type=LineType(str(item["line_type"])), 41 dnc=bool(item["dnc"]), 42 cached=True, 43 verified_at=datetime.fromisoformat(str(item["verified_at"])), 44 source=VerificationSource.CACHE, 45 ) 46 47 except Exception as e: 48 logger.error(f"Cache get error: {str(e)}") 49 return None 50 51 def set(self, phone_number: str, verification: PhoneVerification) -> None: 52 """Store verification result in cache""" 53 try: 54 ttl = int((datetime.now(UTC) + timedelta(days=self.ttl_days)).timestamp()) 55 56 self.table.put_item( 57 Item={ 58 "phone_number": phone_number, 59 "line_type": verification.line_type.value, 60 "dnc": verification.dnc, 61 "verified_at": verification.verified_at.isoformat(), 62 "source": verification.source.value, 63 "ttl": ttl, 64 } 65 ) 66 67 logger.info(f"Cached result for {phone_number[:6]}***") 68 69 except Exception as e: 70 logger.error(f"Cache set error: {str(e)}") 71 # Don't fail the request if cache write fails 72 73 def batch_get(self, phone_numbers: list[str]) -> dict[str, PhoneVerification | None]: 74 """Get multiple cached results""" 75 results: dict[str, PhoneVerification | None] = {} 76 77 # DynamoDB batch get (max 100 items per request) 78 for i in range(0, len(phone_numbers), 100): 79 batch = phone_numbers[i : i + 100] 80 81 try: 82 response = self.dynamodb.batch_get_item( 83 RequestItems={ 84 self.table_name: {"Keys": [{"phone_number": phone} for phone in batch]} 85 } 86 ) 87 88 for item in response.get("Responses", {}).get(self.table_name, []): 89 phone = str(item["phone_number"]) 90 results[phone] = PhoneVerification( 91 phone_number=phone, 92 line_type=LineType(str(item["line_type"])), 93 dnc=bool(item["dnc"]), 94 cached=True, 95 verified_at=datetime.fromisoformat(str(item["verified_at"])), 96 source=VerificationSource.CACHE, 97 ) 98 99 except Exception as e: 100 logger.error(f"Batch cache get error: {str(e)}") 101 102 # Fill in None for misses 103 for phone in phone_numbers: 104 if phone not in results: 105 results[phone] = None 106 107 return results
Cache for phone verification results using DynamoDB with TTL
26 def get(self, phone_number: str) -> PhoneVerification | None: 27 """Get cached verification result""" 28 try: 29 response = self.table.get_item(Key={"phone_number": phone_number}) 30 31 if "Item" not in response: 32 logger.info(f"Cache miss for {phone_number[:6]}***") 33 return None 34 35 item: dict[str, Any] = response["Item"] 36 logger.info(f"Cache hit for {phone_number[:6]}***") 37 38 return PhoneVerification( 39 phone_number=str(item["phone_number"]), 40 line_type=LineType(str(item["line_type"])), 41 dnc=bool(item["dnc"]), 42 cached=True, 43 verified_at=datetime.fromisoformat(str(item["verified_at"])), 44 source=VerificationSource.CACHE, 45 ) 46 47 except Exception as e: 48 logger.error(f"Cache get error: {str(e)}") 49 return None
Get cached verification result
51 def set(self, phone_number: str, verification: PhoneVerification) -> None: 52 """Store verification result in cache""" 53 try: 54 ttl = int((datetime.now(UTC) + timedelta(days=self.ttl_days)).timestamp()) 55 56 self.table.put_item( 57 Item={ 58 "phone_number": phone_number, 59 "line_type": verification.line_type.value, 60 "dnc": verification.dnc, 61 "verified_at": verification.verified_at.isoformat(), 62 "source": verification.source.value, 63 "ttl": ttl, 64 } 65 ) 66 67 logger.info(f"Cached result for {phone_number[:6]}***") 68 69 except Exception as e: 70 logger.error(f"Cache set error: {str(e)}") 71 # Don't fail the request if cache write fails
Store verification result in cache
73 def batch_get(self, phone_numbers: list[str]) -> dict[str, PhoneVerification | None]: 74 """Get multiple cached results""" 75 results: dict[str, PhoneVerification | None] = {} 76 77 # DynamoDB batch get (max 100 items per request) 78 for i in range(0, len(phone_numbers), 100): 79 batch = phone_numbers[i : i + 100] 80 81 try: 82 response = self.dynamodb.batch_get_item( 83 RequestItems={ 84 self.table_name: {"Keys": [{"phone_number": phone} for phone in batch]} 85 } 86 ) 87 88 for item in response.get("Responses", {}).get(self.table_name, []): 89 phone = str(item["phone_number"]) 90 results[phone] = PhoneVerification( 91 phone_number=phone, 92 line_type=LineType(str(item["line_type"])), 93 dnc=bool(item["dnc"]), 94 cached=True, 95 verified_at=datetime.fromisoformat(str(item["verified_at"])), 96 source=VerificationSource.CACHE, 97 ) 98 99 except Exception as e: 100 logger.error(f"Batch cache get error: {str(e)}") 101 102 # Fill in None for misses 103 for phone in phone_numbers: 104 if phone not in results: 105 results[phone] = None 106 107 return results
Get multiple cached results
40def compute_key_hash(key: str) -> str: 41 """Compute the SHA-256 hash of an API key. 42 43 Args: 44 key: The full API key string. 45 46 Returns: 47 Hex digest of the SHA-256 hash. 48 """ 49 return hashlib.sha256(key.encode()).hexdigest()
Compute the SHA-256 hash of an API key.
Args: key: The full API key string.
Returns: Hex digest of the SHA-256 hash.
22def generate_key_id() -> str: 23 """Generate a unique key ID with mk_ prefix. 24 25 Returns a key ID in the format ``mk_<24 hex chars>`` using 96-bit 26 entropy via :func:`secrets.token_hex`. 27 """ 28 return f"{KEY_ID_PREFIX}{secrets.token_hex(12)}"
Generate a unique key ID with mk_ prefix.
Returns a key ID in the format mk_<24 hex chars> using 96-bit
entropy via secrets.token_hex().
31def generate_managed_key() -> str: 32 """Generate a new managed API key. 33 34 Returns a key in the format ``lls_mk_<40 hex chars>`` using 160-bit 35 entropy via :func:`secrets.token_hex`. 36 """ 37 return f"{MANAGED_KEY_PREFIX}{secrets.token_hex(20)}"
Generate a new managed API key.
Returns a key in the format lls_mk_<40 hex chars> using 160-bit
entropy via secrets.token_hex().
52def validate_expiration_days(days: int) -> bool: 53 """Validate that an expiration period is within the allowed range. 54 55 Args: 56 days: Number of days until key expiration. 57 58 Returns: 59 ``True`` if *days* is between 1 and 730 inclusive, ``False`` otherwise. 60 """ 61 return MIN_EXPIRATION_DAYS <= days <= MAX_EXPIRATION_DAYS
Validate that an expiration period is within the allowed range.
Args: days: Number of days until key expiration.
Returns:
True if days is between 1 and 730 inclusive, False otherwise.
8class DecimalEncoder(json.JSONEncoder): 9 """JSON encoder that handles DynamoDB Decimal types.""" 10 11 def default(self, obj: object) -> object: 12 if isinstance(obj, Decimal): 13 return int(obj) if obj % 1 == 0 else float(obj) 14 return super().default(obj)
JSON encoder that handles DynamoDB Decimal types.
11 def default(self, obj: object) -> object: 12 if isinstance(obj, Decimal): 13 return int(obj) if obj % 1 == 0 else float(obj) 14 return super().default(obj)
Implement this method in a subclass such that it returns
a serializable object for o, or calls the base implementation
(to raise a TypeError).
For example, to support arbitrary iterators, you could implement default like this::
def default(self, o):
try:
iterable = iter(o)
except TypeError:
pass
else:
return list(iterable)
# Let the base class default method raise the TypeError
return super().default(o)
17def extract_area_code(phone: str) -> str: 18 """Extract 3-digit area code from a phone number string. 19 20 Handles E.164 format (+1XXXXXXXXXX) and raw digits. 21 Returns 'unknown' if fewer than 3 digits. 22 """ 23 digits = "".join(c for c in phone if c.isdigit()) 24 if digits.startswith("1") and len(digits) >= 4: 25 return digits[1:4] 26 if len(digits) >= 3: 27 return digits[:3] 28 return "unknown"
Extract 3-digit area code from a phone number string.
Handles E.164 format (+1XXXXXXXXXX) and raw digits. Returns 'unknown' if fewer than 3 digits.
46class ManagedApiKeyService: 47 """Manages user API keys with CRUD operations in DynamoDB. 48 49 DynamoDB table schema: 50 - Hash key: user_id (S) 51 - Range key: key_id (S) 52 """ 53 54 table: "Table | None" 55 56 def __init__(self, table_name: str | None = None): 57 """Initialize with DynamoDB table.""" 58 if not HAS_BOTO3 or not boto3: 59 raise RuntimeError("boto3 is required for ManagedApiKeyService") 60 61 self.dynamodb = boto3.resource("dynamodb") 62 self.table_name = table_name if table_name else os.environ["MANAGED_API_KEYS_TABLE"] 63 64 try: 65 self.table = self.dynamodb.Table(self.table_name) 66 except Exception as e: 67 logger.error(f"Failed to connect to DynamoDB table {self.table_name}: {e}") 68 self.table = None 69 70 def _get_key(self, user_id: str, key_id: str) -> dict[str, Any]: 71 """Fetch a key item, raising if not found or revoked.""" 72 if not self.table: 73 raise RuntimeError(f"DynamoDB table {self.table_name} not accessible") 74 75 response = self.table.get_item(Key={"user_id": user_id, "key_id": key_id}) 76 item = response.get("Item") 77 if not item: 78 raise KeyNotFoundError(f"Key {key_id} not found for user {user_id}") 79 if item.get("status") == "revoked": 80 raise RevokedKeyError(f"Key {key_id} is revoked") 81 return item 82 83 def list_keys(self, user_id: str) -> list[dict[str, Any]]: 84 """List all API keys for a user, sorted by created_at descending. 85 86 Returns projected fields only (excludes key_hash). 87 """ 88 if not self.table: 89 raise RuntimeError(f"DynamoDB table {self.table_name} not accessible") 90 91 response = self.table.query( 92 KeyConditionExpression="user_id = :uid", 93 ExpressionAttributeValues={":uid": user_id}, 94 ) 95 items = response.get("Items", []) 96 97 result = [] 98 for item in items: 99 result.append( 100 { 101 "key_id": item["key_id"], 102 "key_last4": item.get("key_last4", ""), 103 "label": item.get("label", ""), 104 "status": item.get("status", "active"), 105 "created_at": item.get("created_at", ""), 106 "expires_at": item.get("expires_at"), 107 "last_used_at": item.get("last_used_at"), 108 } 109 ) 110 111 result.sort(key=lambda x: str(x.get("created_at", "")), reverse=True) 112 return result 113 114 def create_key(self, user_id: str, label: str, expires_in_days: int = 365) -> dict[str, Any]: 115 """Create a new managed API key. 116 117 Returns the key_id and plaintext key (only time key is returned). 118 """ 119 if not self.table: 120 raise RuntimeError(f"DynamoDB table {self.table_name} not accessible") 121 122 # Validate label 123 label = label.strip() 124 if not label or len(label) > MAX_LABEL_LENGTH: 125 raise ValueError(f"Label must be 1-{MAX_LABEL_LENGTH} characters, got {len(label)}") 126 127 # Validate expiration 128 if not validate_expiration_days(expires_in_days): 129 raise ValueError(f"Expiration must be 1-730 days, got {expires_in_days}") 130 131 # Check active key count 132 existing = self.list_keys(user_id) 133 active_count = sum(1 for k in existing if k["status"] != "revoked") 134 if active_count >= MAX_ACTIVE_KEYS: 135 raise LimitExceededError(f"Maximum of {MAX_ACTIVE_KEYS} active keys reached") 136 137 # Generate key 138 key_id = generate_key_id() 139 plaintext_key = generate_managed_key() 140 key_hash = compute_key_hash(plaintext_key) 141 now = datetime.now(UTC).isoformat() 142 expires_at = (datetime.now(UTC) + timedelta(days=expires_in_days)).isoformat() 143 144 self.table.put_item( 145 Item={ 146 "user_id": user_id, 147 "key_id": key_id, 148 "key_hash": key_hash, 149 "key_last4": plaintext_key[-4:], 150 "label": label, 151 "status": "active", 152 "created_at": now, 153 "expires_at": expires_at, 154 "last_used_at": None, 155 "usage_count": 0, 156 } 157 ) 158 159 logger.info(f"Created managed key {key_id} for user {user_id}") 160 return { 161 "key_id": key_id, 162 "api_key": plaintext_key, 163 "label": label, 164 "expires_at": expires_at, 165 } 166 167 def update_key( 168 self, 169 user_id: str, 170 key_id: str, 171 label: str | None = None, 172 expires_in_days: int | None = None, 173 ) -> dict[str, Any]: 174 """Update key label and/or expiration.""" 175 if label is None and expires_in_days is None: 176 raise ValueError("At least one of label or expires_in_days must be provided") 177 178 # This will raise KeyNotFoundError or RevokedKeyError 179 self._get_key(user_id, key_id) 180 181 update_parts = ["SET updated_at = :now"] 182 expr_values: dict[str, Any] = {":now": datetime.now(UTC).isoformat()} 183 184 if label is not None: 185 label = label.strip() 186 if not label or len(label) > MAX_LABEL_LENGTH: 187 raise ValueError(f"Label must be 1-{MAX_LABEL_LENGTH} characters") 188 update_parts.append("label = :label") 189 expr_values[":label"] = label 190 191 if expires_in_days is not None: 192 if not validate_expiration_days(expires_in_days): 193 raise ValueError(f"Expiration must be 1-730 days, got {expires_in_days}") 194 expires_at = (datetime.now(UTC) + timedelta(days=expires_in_days)).isoformat() 195 update_parts.append("expires_at = :expires_at") 196 expr_values[":expires_at"] = expires_at 197 198 update_expr = update_parts[0] 199 if len(update_parts) > 1: 200 update_expr += ", " + ", ".join(update_parts[1:]) 201 202 if not self.table: 203 raise RuntimeError(f"DynamoDB table {self.table_name} not accessible") 204 205 self.table.update_item( 206 Key={"user_id": user_id, "key_id": key_id}, 207 UpdateExpression=update_expr, 208 ExpressionAttributeValues=expr_values, 209 ) 210 211 logger.info(f"Updated managed key {key_id} for user {user_id}") 212 return {"message": "Key updated"} 213 214 def rotate_key(self, user_id: str, key_id: str) -> dict[str, Any]: 215 """Generate a new key value while keeping the same key_id. 216 217 Returns the new plaintext key (only time it's returned). 218 """ 219 # This will raise KeyNotFoundError or RevokedKeyError 220 self._get_key(user_id, key_id) 221 222 plaintext_key = generate_managed_key() 223 key_hash = compute_key_hash(plaintext_key) 224 now = datetime.now(UTC).isoformat() 225 226 if not self.table: 227 raise RuntimeError(f"DynamoDB table {self.table_name} not accessible") 228 229 self.table.update_item( 230 Key={"user_id": user_id, "key_id": key_id}, 231 UpdateExpression=("SET key_hash = :hash, key_last4 = :last4, updated_at = :now"), 232 ExpressionAttributeValues={ 233 ":hash": key_hash, 234 ":last4": plaintext_key[-4:], 235 ":now": now, 236 }, 237 ) 238 239 logger.info(f"Rotated managed key {key_id} for user {user_id}") 240 return { 241 "key_id": key_id, 242 "api_key": plaintext_key, 243 "label": "", 244 "expires_at": "", 245 } 246 247 def revoke_key(self, user_id: str, key_id: str) -> None: 248 """Mark a key as revoked with TTL for automatic cleanup.""" 249 if not self.table: 250 raise RuntimeError(f"DynamoDB table {self.table_name} not accessible") 251 252 # Check key exists (but allow revoking already-revoked keys) 253 response = self.table.get_item(Key={"user_id": user_id, "key_id": key_id}) 254 if not response.get("Item"): 255 raise KeyNotFoundError(f"Key {key_id} not found for user {user_id}") 256 257 now = datetime.now(UTC) 258 ttl = int((now + timedelta(days=REVOKE_TTL_DAYS)).timestamp()) 259 260 self.table.update_item( 261 Key={"user_id": user_id, "key_id": key_id}, 262 UpdateExpression=("SET #s = :revoked, revoked_at = :now, #ttl = :ttl"), 263 ExpressionAttributeNames={"#s": "status", "#ttl": "ttl"}, 264 ExpressionAttributeValues={ 265 ":revoked": "revoked", 266 ":now": now.isoformat(), 267 ":ttl": ttl, 268 }, 269 ) 270 271 logger.info(f"Revoked managed key {key_id} for user {user_id}")
Manages user API keys with CRUD operations in DynamoDB.
DynamoDB table schema: - Hash key: user_id (S) - Range key: key_id (S)
56 def __init__(self, table_name: str | None = None): 57 """Initialize with DynamoDB table.""" 58 if not HAS_BOTO3 or not boto3: 59 raise RuntimeError("boto3 is required for ManagedApiKeyService") 60 61 self.dynamodb = boto3.resource("dynamodb") 62 self.table_name = table_name if table_name else os.environ["MANAGED_API_KEYS_TABLE"] 63 64 try: 65 self.table = self.dynamodb.Table(self.table_name) 66 except Exception as e: 67 logger.error(f"Failed to connect to DynamoDB table {self.table_name}: {e}") 68 self.table = None
Initialize with DynamoDB table.
83 def list_keys(self, user_id: str) -> list[dict[str, Any]]: 84 """List all API keys for a user, sorted by created_at descending. 85 86 Returns projected fields only (excludes key_hash). 87 """ 88 if not self.table: 89 raise RuntimeError(f"DynamoDB table {self.table_name} not accessible") 90 91 response = self.table.query( 92 KeyConditionExpression="user_id = :uid", 93 ExpressionAttributeValues={":uid": user_id}, 94 ) 95 items = response.get("Items", []) 96 97 result = [] 98 for item in items: 99 result.append( 100 { 101 "key_id": item["key_id"], 102 "key_last4": item.get("key_last4", ""), 103 "label": item.get("label", ""), 104 "status": item.get("status", "active"), 105 "created_at": item.get("created_at", ""), 106 "expires_at": item.get("expires_at"), 107 "last_used_at": item.get("last_used_at"), 108 } 109 ) 110 111 result.sort(key=lambda x: str(x.get("created_at", "")), reverse=True) 112 return result
List all API keys for a user, sorted by created_at descending.
Returns projected fields only (excludes key_hash).
114 def create_key(self, user_id: str, label: str, expires_in_days: int = 365) -> dict[str, Any]: 115 """Create a new managed API key. 116 117 Returns the key_id and plaintext key (only time key is returned). 118 """ 119 if not self.table: 120 raise RuntimeError(f"DynamoDB table {self.table_name} not accessible") 121 122 # Validate label 123 label = label.strip() 124 if not label or len(label) > MAX_LABEL_LENGTH: 125 raise ValueError(f"Label must be 1-{MAX_LABEL_LENGTH} characters, got {len(label)}") 126 127 # Validate expiration 128 if not validate_expiration_days(expires_in_days): 129 raise ValueError(f"Expiration must be 1-730 days, got {expires_in_days}") 130 131 # Check active key count 132 existing = self.list_keys(user_id) 133 active_count = sum(1 for k in existing if k["status"] != "revoked") 134 if active_count >= MAX_ACTIVE_KEYS: 135 raise LimitExceededError(f"Maximum of {MAX_ACTIVE_KEYS} active keys reached") 136 137 # Generate key 138 key_id = generate_key_id() 139 plaintext_key = generate_managed_key() 140 key_hash = compute_key_hash(plaintext_key) 141 now = datetime.now(UTC).isoformat() 142 expires_at = (datetime.now(UTC) + timedelta(days=expires_in_days)).isoformat() 143 144 self.table.put_item( 145 Item={ 146 "user_id": user_id, 147 "key_id": key_id, 148 "key_hash": key_hash, 149 "key_last4": plaintext_key[-4:], 150 "label": label, 151 "status": "active", 152 "created_at": now, 153 "expires_at": expires_at, 154 "last_used_at": None, 155 "usage_count": 0, 156 } 157 ) 158 159 logger.info(f"Created managed key {key_id} for user {user_id}") 160 return { 161 "key_id": key_id, 162 "api_key": plaintext_key, 163 "label": label, 164 "expires_at": expires_at, 165 }
Create a new managed API key.
Returns the key_id and plaintext key (only time key is returned).
167 def update_key( 168 self, 169 user_id: str, 170 key_id: str, 171 label: str | None = None, 172 expires_in_days: int | None = None, 173 ) -> dict[str, Any]: 174 """Update key label and/or expiration.""" 175 if label is None and expires_in_days is None: 176 raise ValueError("At least one of label or expires_in_days must be provided") 177 178 # This will raise KeyNotFoundError or RevokedKeyError 179 self._get_key(user_id, key_id) 180 181 update_parts = ["SET updated_at = :now"] 182 expr_values: dict[str, Any] = {":now": datetime.now(UTC).isoformat()} 183 184 if label is not None: 185 label = label.strip() 186 if not label or len(label) > MAX_LABEL_LENGTH: 187 raise ValueError(f"Label must be 1-{MAX_LABEL_LENGTH} characters") 188 update_parts.append("label = :label") 189 expr_values[":label"] = label 190 191 if expires_in_days is not None: 192 if not validate_expiration_days(expires_in_days): 193 raise ValueError(f"Expiration must be 1-730 days, got {expires_in_days}") 194 expires_at = (datetime.now(UTC) + timedelta(days=expires_in_days)).isoformat() 195 update_parts.append("expires_at = :expires_at") 196 expr_values[":expires_at"] = expires_at 197 198 update_expr = update_parts[0] 199 if len(update_parts) > 1: 200 update_expr += ", " + ", ".join(update_parts[1:]) 201 202 if not self.table: 203 raise RuntimeError(f"DynamoDB table {self.table_name} not accessible") 204 205 self.table.update_item( 206 Key={"user_id": user_id, "key_id": key_id}, 207 UpdateExpression=update_expr, 208 ExpressionAttributeValues=expr_values, 209 ) 210 211 logger.info(f"Updated managed key {key_id} for user {user_id}") 212 return {"message": "Key updated"}
Update key label and/or expiration.
214 def rotate_key(self, user_id: str, key_id: str) -> dict[str, Any]: 215 """Generate a new key value while keeping the same key_id. 216 217 Returns the new plaintext key (only time it's returned). 218 """ 219 # This will raise KeyNotFoundError or RevokedKeyError 220 self._get_key(user_id, key_id) 221 222 plaintext_key = generate_managed_key() 223 key_hash = compute_key_hash(plaintext_key) 224 now = datetime.now(UTC).isoformat() 225 226 if not self.table: 227 raise RuntimeError(f"DynamoDB table {self.table_name} not accessible") 228 229 self.table.update_item( 230 Key={"user_id": user_id, "key_id": key_id}, 231 UpdateExpression=("SET key_hash = :hash, key_last4 = :last4, updated_at = :now"), 232 ExpressionAttributeValues={ 233 ":hash": key_hash, 234 ":last4": plaintext_key[-4:], 235 ":now": now, 236 }, 237 ) 238 239 logger.info(f"Rotated managed key {key_id} for user {user_id}") 240 return { 241 "key_id": key_id, 242 "api_key": plaintext_key, 243 "label": "", 244 "expires_at": "", 245 }
Generate a new key value while keeping the same key_id.
Returns the new plaintext key (only time it's returned).
247 def revoke_key(self, user_id: str, key_id: str) -> None: 248 """Mark a key as revoked with TTL for automatic cleanup.""" 249 if not self.table: 250 raise RuntimeError(f"DynamoDB table {self.table_name} not accessible") 251 252 # Check key exists (but allow revoking already-revoked keys) 253 response = self.table.get_item(Key={"user_id": user_id, "key_id": key_id}) 254 if not response.get("Item"): 255 raise KeyNotFoundError(f"Key {key_id} not found for user {user_id}") 256 257 now = datetime.now(UTC) 258 ttl = int((now + timedelta(days=REVOKE_TTL_DAYS)).timestamp()) 259 260 self.table.update_item( 261 Key={"user_id": user_id, "key_id": key_id}, 262 UpdateExpression=("SET #s = :revoked, revoked_at = :now, #ttl = :ttl"), 263 ExpressionAttributeNames={"#s": "status", "#ttl": "ttl"}, 264 ExpressionAttributeValues={ 265 ":revoked": "revoked", 266 ":now": now.isoformat(), 267 ":ttl": ttl, 268 }, 269 ) 270 271 logger.info(f"Revoked managed key {key_id} for user {user_id}")
Mark a key as revoked with TTL for automatic cleanup.
Raised when a managed API key is not found.
Raised when attempting to modify a revoked key.
Raised when the active key limit is reached.
5class ProviderError(ValueError): 6 """Raised when an upstream verification provider fails. 7 8 Extends ValueError for backward compatibility with existing 9 error handlers. Carries an optional HTTP status code from the 10 upstream response so callers can distinguish client errors from 11 server/network failures. 12 """ 13 14 def __init__(self, message: str, status_code: int | None = None): 15 super().__init__(message) 16 self.status_code = status_code
Raised when an upstream verification provider fails.
Extends ValueError for backward compatibility with existing error handlers. Carries an optional HTTP status code from the upstream response so callers can distinguish client errors from server/network failures.
88class FileService: 89 """Manages file metadata listing with filtering and pagination. 90 91 DynamoDB table schema: 92 - Hash key: file_id (S) 93 - GSI: user-index on user_id (HASH) / created_at (RANGE) 94 """ 95 96 table: "Table | None" 97 98 def __init__(self, table_name: str | None = None): 99 """Initialize with DynamoDB table.""" 100 if not HAS_BOTO3 or not boto3: 101 raise RuntimeError("boto3 is required for FileService") 102 103 self.dynamodb = boto3.resource("dynamodb") 104 self.table_name = table_name if table_name else os.environ["FILES_TABLE"] 105 106 try: 107 self.table = self.dynamodb.Table(self.table_name) 108 except Exception as e: 109 logger.error(f"Failed to connect to DynamoDB table {self.table_name}: {e}") 110 self.table = None 111 112 def _query_all_user_files(self, user_id: str) -> list[dict[str, Any]]: 113 """Query all files for a user via the user-index GSI.""" 114 if not self.table: 115 raise RuntimeError(f"DynamoDB table {self.table_name} not accessible") 116 117 items: list[dict[str, Any]] = [] 118 kwargs: dict[str, Any] = { 119 "IndexName": "user-index", 120 "KeyConditionExpression": "user_id = :uid", 121 "ExpressionAttributeValues": {":uid": user_id}, 122 } 123 124 while True: 125 response = self.table.query(**kwargs) 126 items.extend(response.get("Items", [])) 127 last_key = response.get("LastEvaluatedKey") 128 if not last_key: 129 break 130 kwargs["ExclusiveStartKey"] = last_key 131 132 return items 133 134 def list_user_files( 135 self, 136 user_id: str, 137 page: int = 1, 138 limit: int = 20, 139 search: str = "", 140 sort_by: str = "created_at", 141 sort_order: str = "desc", 142 status_filter: str = "", 143 ) -> dict[str, Any]: 144 """List files for a user with filtering, sorting, and pagination. 145 146 Args: 147 user_id: User identifier. 148 page: Page number (1-indexed). 149 limit: Items per page. 150 search: Case-insensitive substring match on filename. 151 sort_by: Column to sort by (from ALLOWED_SORT_COLUMNS). 152 sort_order: 'asc' or 'desc'. 153 status_filter: Filter by file status. 154 155 Returns: 156 Dict with files list and pagination metadata. 157 """ 158 items = self._query_all_user_files(user_id) 159 160 # Apply status filter 161 if status_filter and status_filter in ALLOWED_STATUSES: 162 items = [i for i in items if i.get("status") == status_filter] 163 164 # Apply search filter 165 if search: 166 search_lower = search.lower() 167 items = [i for i in items if search_lower in _get_filename(i).lower()] 168 169 # Sort 170 sort_key = SORT_KEY_MAP.get(sort_by, SORT_KEY_MAP["created_at"]) 171 reverse = sort_order != "asc" 172 items.sort(key=sort_key, reverse=reverse) 173 174 # Pagination 175 total_count = len(items) 176 total_pages = max(1, math.ceil(total_count / limit)) 177 start = (page - 1) * limit 178 end = start + limit 179 page_items = items[start:end] 180 181 return { 182 "files": [_map_file_fields(item) for item in page_items], 183 "page": page, 184 "per_page": limit, 185 "has_more": end < total_count, 186 "total_pages": total_pages, 187 "total_files_count": total_count, 188 } 189 190 def list_user_files_admin( 191 self, 192 user_id: str, 193 page: int = 1, 194 limit: int = 20, 195 ) -> dict[str, Any]: 196 """Admin view of user files with raw S3 keys. 197 198 Returns raw S3 keys for the handler to generate presigned URLs. 199 """ 200 items = self._query_all_user_files(user_id) 201 202 # Sort newest first 203 items.sort(key=lambda i: i.get("created_at", ""), reverse=True) 204 205 total = len(items) 206 start = (page - 1) * limit 207 end = start + limit 208 page_items = items[start:end] 209 210 return { 211 "files": [_map_admin_fields(item) for item in page_items], 212 "total": total, 213 "page": page, 214 "limit": limit, 215 }
Manages file metadata listing with filtering and pagination.
DynamoDB table schema: - Hash key: file_id (S) - GSI: user-index on user_id (HASH) / created_at (RANGE)
98 def __init__(self, table_name: str | None = None): 99 """Initialize with DynamoDB table.""" 100 if not HAS_BOTO3 or not boto3: 101 raise RuntimeError("boto3 is required for FileService") 102 103 self.dynamodb = boto3.resource("dynamodb") 104 self.table_name = table_name if table_name else os.environ["FILES_TABLE"] 105 106 try: 107 self.table = self.dynamodb.Table(self.table_name) 108 except Exception as e: 109 logger.error(f"Failed to connect to DynamoDB table {self.table_name}: {e}") 110 self.table = None
Initialize with DynamoDB table.
134 def list_user_files( 135 self, 136 user_id: str, 137 page: int = 1, 138 limit: int = 20, 139 search: str = "", 140 sort_by: str = "created_at", 141 sort_order: str = "desc", 142 status_filter: str = "", 143 ) -> dict[str, Any]: 144 """List files for a user with filtering, sorting, and pagination. 145 146 Args: 147 user_id: User identifier. 148 page: Page number (1-indexed). 149 limit: Items per page. 150 search: Case-insensitive substring match on filename. 151 sort_by: Column to sort by (from ALLOWED_SORT_COLUMNS). 152 sort_order: 'asc' or 'desc'. 153 status_filter: Filter by file status. 154 155 Returns: 156 Dict with files list and pagination metadata. 157 """ 158 items = self._query_all_user_files(user_id) 159 160 # Apply status filter 161 if status_filter and status_filter in ALLOWED_STATUSES: 162 items = [i for i in items if i.get("status") == status_filter] 163 164 # Apply search filter 165 if search: 166 search_lower = search.lower() 167 items = [i for i in items if search_lower in _get_filename(i).lower()] 168 169 # Sort 170 sort_key = SORT_KEY_MAP.get(sort_by, SORT_KEY_MAP["created_at"]) 171 reverse = sort_order != "asc" 172 items.sort(key=sort_key, reverse=reverse) 173 174 # Pagination 175 total_count = len(items) 176 total_pages = max(1, math.ceil(total_count / limit)) 177 start = (page - 1) * limit 178 end = start + limit 179 page_items = items[start:end] 180 181 return { 182 "files": [_map_file_fields(item) for item in page_items], 183 "page": page, 184 "per_page": limit, 185 "has_more": end < total_count, 186 "total_pages": total_pages, 187 "total_files_count": total_count, 188 }
List files for a user with filtering, sorting, and pagination.
Args: user_id: User identifier. page: Page number (1-indexed). limit: Items per page. search: Case-insensitive substring match on filename. sort_by: Column to sort by (from ALLOWED_SORT_COLUMNS). sort_order: 'asc' or 'desc'. status_filter: Filter by file status.
Returns: Dict with files list and pagination metadata.
190 def list_user_files_admin( 191 self, 192 user_id: str, 193 page: int = 1, 194 limit: int = 20, 195 ) -> dict[str, Any]: 196 """Admin view of user files with raw S3 keys. 197 198 Returns raw S3 keys for the handler to generate presigned URLs. 199 """ 200 items = self._query_all_user_files(user_id) 201 202 # Sort newest first 203 items.sort(key=lambda i: i.get("created_at", ""), reverse=True) 204 205 total = len(items) 206 start = (page - 1) * limit 207 end = start + limit 208 page_items = items[start:end] 209 210 return { 211 "files": [_map_admin_fields(item) for item in page_items], 212 "total": total, 213 "page": page, 214 "limit": limit, 215 }
Admin view of user files with raw S3 keys.
Returns raw S3 keys for the handler to generate presigned URLs.