ai_lls_lib

AI LLS Library - Core business logic for Landline Scrubber.

This library provides phone verification and DNC checking capabilities.

Version 2.0.0 establishes clean semantic versioning baseline. All version management now controlled by Python Semantic Release.

Dependencies optimized for Lambda deployment (removed unused pandas/pyarrow).

 1"""
 2AI LLS Library - Core business logic for Landline Scrubber.
 3
 4This library provides phone verification and DNC checking capabilities.
 5
 6Version 2.0.0 establishes clean semantic versioning baseline.
 7All version management now controlled by Python Semantic Release.
 8
 9Dependencies optimized for Lambda deployment (removed unused pandas/pyarrow).
10"""
11
12from ai_lls_lib.apikeys import (
13    KeyNotFoundError,
14    LimitExceededError,
15    ManagedApiKeyService,
16    RevokedKeyError,
17)
18from ai_lls_lib.common import DecimalEncoder, extract_area_code
19from ai_lls_lib.core.cache import DynamoDBCache
20from ai_lls_lib.core.models import (
21    BulkJob,
22    BulkJobStatus,
23    JobStatus,
24    LineType,
25    PhoneVerification,
26    VerificationSource,
27)
28from ai_lls_lib.core.processor import BulkProcessor
29from ai_lls_lib.core.verifier import PhoneVerifier
30from ai_lls_lib.files import FileService
31from ai_lls_lib.key_management import (
32    compute_key_hash,
33    generate_key_id,
34    generate_managed_key,
35    validate_expiration_days,
36)
37from ai_lls_lib.providers.exceptions import ProviderError
38
39__version__ = "3.13.0"
40
41__all__ = [
42    "PhoneVerification",
43    "BulkJob",
44    "BulkJobStatus",
45    "LineType",
46    "VerificationSource",
47    "JobStatus",
48    "PhoneVerifier",
49    "BulkProcessor",
50    "DynamoDBCache",
51    "compute_key_hash",
52    "generate_key_id",
53    "generate_managed_key",
54    "validate_expiration_days",
55    "DecimalEncoder",
56    "extract_area_code",
57    "ManagedApiKeyService",
58    "KeyNotFoundError",
59    "RevokedKeyError",
60    "LimitExceededError",
61    "ProviderError",
62    "FileService",
63]
class PhoneVerification(pydantic.main.BaseModel):
38class PhoneVerification(BaseModel):
39    """Result of phone number verification"""
40
41    phone_number: str = Field(..., description="E.164 formatted phone number")
42    line_type: LineType = Field(..., description="Type of phone line")
43    dnc: bool = Field(..., description="Whether number is on DNC list")
44    cached: bool = Field(..., description="Whether result came from cache")
45    verified_at: datetime = Field(..., description="When verification occurred")
46    source: VerificationSource = Field(..., description="Source of verification data")
47
48    class Config:
49        json_encoders = {datetime: lambda v: v.isoformat()}

Result of phone number verification

phone_number: str = PydanticUndefined

E.164 formatted phone number

line_type: LineType = PydanticUndefined

Type of phone line

dnc: bool = PydanticUndefined

Whether number is on DNC list

cached: bool = PydanticUndefined

Whether result came from cache

verified_at: datetime.datetime = PydanticUndefined

When verification occurred

source: VerificationSource = PydanticUndefined

Source of verification data

class PhoneVerification.Config:
48    class Config:
49        json_encoders = {datetime: lambda v: v.isoformat()}
json_encoders = {<class 'datetime.datetime'>: <function PhoneVerification.Config.<lambda>>}
class BulkJob(pydantic.main.BaseModel):
52class BulkJob(BaseModel):
53    """Bulk processing job metadata"""
54
55    job_id: str = Field(..., description="Unique job identifier")
56    status: JobStatus = Field(..., description="Current job status")

Bulk processing job metadata

job_id: str = PydanticUndefined

Unique job identifier

status: JobStatus = PydanticUndefined

Current job status

class BulkJobStatus(ai_lls_lib.BulkJob):
59class BulkJobStatus(BulkJob):
60    """Extended bulk job status with progress info"""
61
62    total_rows: int | None = Field(None, description="Total rows to process")
63    processed_rows: int | None = Field(None, description="Rows processed so far")
64    result_url: str | None = Field(None, description="S3 URL of results")
65    created_at: datetime = Field(..., description="Job creation time")
66    completed_at: datetime | None = Field(None, description="Job completion time")
67    error: str | None = Field(None, description="Error message if failed")

Extended bulk job status with progress info

total_rows: int | None = None

Total rows to process

processed_rows: int | None = None

Rows processed so far

result_url: str | None = None

S3 URL of results

created_at: datetime.datetime = PydanticUndefined

Job creation time

completed_at: datetime.datetime | None = None

Job completion time

error: str | None = None

Error message if failed

class LineType(enum.StrEnum):
12class LineType(StrEnum):
13    """Phone line type enumeration"""
14
15    MOBILE = "mobile"
16    LANDLINE = "landline"
17    VOIP = "voip"
18    UNKNOWN = "unknown"

Phone line type enumeration

MOBILE = <LineType.MOBILE: 'mobile'>
LANDLINE = <LineType.LANDLINE: 'landline'>
VOIP = <LineType.VOIP: 'voip'>
UNKNOWN = <LineType.UNKNOWN: 'unknown'>
class VerificationSource(enum.StrEnum):
21class VerificationSource(StrEnum):
22    """Source of verification data"""
23
24    API = "api"
25    CACHE = "cache"
26    BULK_IMPORT = "bulk_import"

Source of verification data

API = <VerificationSource.API: 'api'>
CACHE = <VerificationSource.CACHE: 'cache'>
BULK_IMPORT = <VerificationSource.BULK_IMPORT: 'bulk_import'>
class JobStatus(enum.StrEnum):
29class JobStatus(StrEnum):
30    """Bulk job status enumeration"""
31
32    PENDING = "pending"
33    PROCESSING = "processing"
34    COMPLETED = "completed"
35    FAILED = "failed"

Bulk job status enumeration

PENDING = <JobStatus.PENDING: 'pending'>
PROCESSING = <JobStatus.PROCESSING: 'processing'>
COMPLETED = <JobStatus.COMPLETED: 'completed'>
FAILED = <JobStatus.FAILED: 'failed'>
class PhoneVerifier:
18class PhoneVerifier:
19    """Verifies phone numbers for line type and DNC status"""
20
21    def __init__(
22        self, cache: DynamoDBCache | None = None, provider: VerificationProvider | None = None
23    ):
24        """
25        Initialize phone verifier.
26
27        Args:
28            cache: Optional DynamoDB cache for storing results
29            provider: Verification provider (defaults to ExternalAPIProvider)
30        """
31        self.cache = cache
32        self.provider = provider or ExternalAPIProvider()
33        logger.debug("PhoneVerifier initialized")
34
35    def normalize_phone(self, phone: str) -> str:
36        """Normalize phone to E.164 format"""
37        try:
38            # Parse with US as default country
39            parsed = phonenumbers.parse(phone, "US")
40            if not phonenumbers.is_valid_number(parsed):
41                raise ValueError(f"Invalid phone number: {phone}")
42
43            # Format as E.164
44            return phonenumbers.format_number(parsed, phonenumbers.PhoneNumberFormat.E164)
45        except Exception as e:
46            logger.error(f"Phone normalization failed: {str(e)}")
47            raise ValueError(f"Invalid phone format: {phone}") from e
48
49    def verify(self, phone: str) -> PhoneVerification:
50        """Verify phone number for line type and DNC status"""
51        normalized = self.normalize_phone(phone)
52
53        # Check cache first if available
54        if self.cache:
55            cached = self.cache.get(normalized)
56            if cached:
57                return cached
58
59        # Use provider to verify
60        line_type, dnc_status = self.provider.verify_phone(normalized)
61
62        result = PhoneVerification(
63            phone_number=normalized,
64            line_type=line_type,
65            dnc=dnc_status,
66            cached=False,
67            verified_at=datetime.now(UTC),
68            source=VerificationSource.API,
69        )
70
71        # Store in cache if available
72        if self.cache:
73            try:
74                self.cache.set(normalized, result)
75            except Exception as e:
76                logger.warning(f"Failed to cache result: {e}")
77                # Continue without caching - don't fail the verification
78
79        return result
80
81    def _check_line_type(self, phone: str) -> LineType:
82        """
83        Check line type (for backwards compatibility with CLI).
84        Delegates to provider.
85        """
86        line_type, _ = self.provider.verify_phone(phone)
87        return line_type
88
89    def _check_dnc(self, phone: str) -> bool:
90        """
91        Check DNC status (for backwards compatibility with CLI).
92        Delegates to provider.
93        """
94        _, dnc_status = self.provider.verify_phone(phone)
95        return dnc_status

Verifies phone numbers for line type and DNC status

PhoneVerifier( cache: DynamoDBCache | None = None, provider: ai_lls_lib.providers.base.VerificationProvider | None = None)
21    def __init__(
22        self, cache: DynamoDBCache | None = None, provider: VerificationProvider | None = None
23    ):
24        """
25        Initialize phone verifier.
26
27        Args:
28            cache: Optional DynamoDB cache for storing results
29            provider: Verification provider (defaults to ExternalAPIProvider)
30        """
31        self.cache = cache
32        self.provider = provider or ExternalAPIProvider()
33        logger.debug("PhoneVerifier initialized")

Initialize phone verifier.

Args: cache: Optional DynamoDB cache for storing results provider: Verification provider (defaults to ExternalAPIProvider)

cache
provider
def normalize_phone(self, phone: str) -> str:
35    def normalize_phone(self, phone: str) -> str:
36        """Normalize phone to E.164 format"""
37        try:
38            # Parse with US as default country
39            parsed = phonenumbers.parse(phone, "US")
40            if not phonenumbers.is_valid_number(parsed):
41                raise ValueError(f"Invalid phone number: {phone}")
42
43            # Format as E.164
44            return phonenumbers.format_number(parsed, phonenumbers.PhoneNumberFormat.E164)
45        except Exception as e:
46            logger.error(f"Phone normalization failed: {str(e)}")
47            raise ValueError(f"Invalid phone format: {phone}") from e

Normalize phone to E.164 format

def verify(self, phone: str) -> PhoneVerification:
49    def verify(self, phone: str) -> PhoneVerification:
50        """Verify phone number for line type and DNC status"""
51        normalized = self.normalize_phone(phone)
52
53        # Check cache first if available
54        if self.cache:
55            cached = self.cache.get(normalized)
56            if cached:
57                return cached
58
59        # Use provider to verify
60        line_type, dnc_status = self.provider.verify_phone(normalized)
61
62        result = PhoneVerification(
63            phone_number=normalized,
64            line_type=line_type,
65            dnc=dnc_status,
66            cached=False,
67            verified_at=datetime.now(UTC),
68            source=VerificationSource.API,
69        )
70
71        # Store in cache if available
72        if self.cache:
73            try:
74                self.cache.set(normalized, result)
75            except Exception as e:
76                logger.warning(f"Failed to cache result: {e}")
77                # Continue without caching - don't fail the verification
78
79        return result

Verify phone number for line type and DNC status

class BulkProcessor:
 18class BulkProcessor:
 19    """Process CSV files for bulk phone verification"""
 20
 21    def __init__(self, verifier: PhoneVerifier):
 22        self.verifier = verifier
 23
 24    def process_csv(self, csv_text: str, phone_column: str = "phone") -> list[PhoneVerification]:
 25        """
 26        Process CSV text content.
 27        Returns list of verification results.
 28        """
 29        results = []
 30
 31        try:
 32            # Strip UTF-8 BOM if present (Excel on Windows adds this)
 33            csv_text = csv_text.lstrip("\ufeff")
 34
 35            # Use StringIO to parse CSV text
 36            csv_file = StringIO(csv_text)
 37            reader = csv.DictReader(csv_file)
 38
 39            # Find phone column (case-insensitive)
 40            headers = reader.fieldnames or []
 41            phone_col = self._find_phone_column(headers, phone_column)
 42
 43            if not phone_col:
 44                raise ValueError(f"Phone column '{phone_column}' not found in CSV")
 45
 46            logger.info(f"Starting CSV processing using phone column '{phone_col}'")
 47
 48            for row_num, row in enumerate(reader, start=2):  # Start at 2 (header is 1)
 49                try:
 50                    phone = row.get(phone_col, "").strip()
 51                    if not phone:
 52                        logger.warning(f"Empty phone at row {row_num}")
 53                        continue
 54
 55                    # Verify phone
 56                    result = self.verifier.verify(phone)
 57                    results.append(result)
 58
 59                    # Log progress every 100 rows
 60                    if len(results) % 100 == 0:
 61                        logger.info(f"Processed {len(results)} phones (at row {row_num})")
 62
 63                except ValueError as e:
 64                    logger.warning(f"Invalid phone at row {row_num}: {str(e)}")
 65                    continue
 66                except Exception as e:
 67                    logger.error(f"Error processing row {row_num}: {str(e)}")
 68                    continue
 69
 70            logger.info(f"Completed processing {len(results)} valid phones")
 71
 72        except Exception as e:
 73            logger.error(f"CSV processing failed: {str(e)}")
 74            raise
 75
 76        return results
 77
 78    def _find_phone_column(self, headers: list[str] | Sequence[str], preferred: str) -> str | None:
 79        """Find phone column in headers (case-insensitive)"""
 80        # First try exact match
 81        for header in headers:
 82            if header.lower() == preferred.lower():
 83                return header
 84
 85        # Common phone column names
 86        phone_patterns = [
 87            "phone",
 88            "phone_number",
 89            "phonenumber",
 90            "mobile",
 91            "cell",
 92            "telephone",
 93            "tel",
 94            "number",
 95            "contact",
 96        ]
 97
 98        for header in headers:
 99            header_lower = header.lower()
100            for pattern in phone_patterns:
101                if pattern in header_lower:
102                    logger.info(f"Using column '{header}' as phone column")
103                    return header
104
105        return None
106
107    def generate_results_csv(self, original_csv_text: str, results: list[PhoneVerification]) -> str:
108        """
109        Generate CSV with original data plus verification results.
110        Adds columns: line_type, dnc
111        Returns CSV text string.
112        """
113        # Create lookup dict
114        results_map = {r.phone_number: r for r in results}
115
116        # Parse original CSV (strip UTF-8 BOM if present)
117        original_csv_text = original_csv_text.lstrip("\ufeff")
118        input_file = StringIO(original_csv_text)
119        reader = csv.DictReader(input_file)
120        headers = list(reader.fieldnames or [])
121
122        # Add new columns
123        output_headers = headers + ["line_type", "dnc"]
124
125        # Create output CSV in memory
126        output = StringIO()
127        writer = csv.DictWriter(output, fieldnames=output_headers)
128        writer.writeheader()
129
130        phone_col = self._find_phone_column(headers, "phone")
131
132        for row in reader:
133            phone = row.get(phone_col, "").strip()
134
135            # Try to normalize for lookup
136            try:
137                normalized = self.verifier.normalize_phone(phone)
138                if normalized in results_map:
139                    result = results_map[normalized]
140                    row["line_type"] = result.line_type.value
141                    row["dnc"] = "true" if result.dnc else "false"
142                else:
143                    row["line_type"] = "unknown"
144                    row["dnc"] = ""
145            except Exception:
146                row["line_type"] = "invalid"
147                row["dnc"] = ""
148
149            writer.writerow(row)
150
151        # Return CSV text
152        return output.getvalue()
153
154    def process_csv_stream(
155        self, lines: Iterable[str], phone_column: str = "phone", batch_size: int = 100
156    ) -> Iterator[list[PhoneVerification]]:
157        """
158        Process CSV lines as a stream, yielding batches of results.
159        Memory-efficient for large files.
160
161        Args:
162            lines: Iterator of CSV lines (including header)
163            phone_column: Column name containing phone numbers
164            batch_size: Number of results to accumulate before yielding
165
166        Yields:
167            Batches of PhoneVerification results
168        """
169        lines_list = list(lines)  # Need to iterate twice - once for headers, once for data
170
171        if not lines_list:
172            logger.error("Empty CSV stream")
173            return
174
175        # Parse header (strip UTF-8 BOM if present)
176        header_line = lines_list[0].lstrip("\ufeff")
177        reader = csv.DictReader(StringIO(header_line))
178        headers = reader.fieldnames or []
179        phone_col = self._find_phone_column(headers, phone_column)
180
181        if not phone_col:
182            raise ValueError(f"Phone column '{phone_column}' not found in CSV")
183
184        batch = []
185        row_num = 2  # Start at 2 (header is 1)
186        total_processed = 0
187
188        # Process data lines
189        for line in lines_list[1:]:
190            if not line.strip():
191                continue
192
193            try:
194                # Parse single line
195                row = next(csv.DictReader(StringIO(line), fieldnames=headers))
196                phone = row.get(phone_col, "").strip()
197
198                if not phone:
199                    logger.warning(f"Empty phone at row {row_num}")
200                    row_num += 1
201                    continue
202
203                # Verify phone
204                result = self.verifier.verify(phone)
205                batch.append(result)
206                total_processed += 1
207
208                # Yield batch when full
209                if len(batch) >= batch_size:
210                    logger.info(
211                        f"Processed batch of {len(batch)} phones (total: {total_processed}, at row {row_num})"
212                    )
213                    yield batch
214                    batch = []
215
216            except ValueError as e:
217                logger.warning(f"Invalid phone at row {row_num}: {str(e)}")
218            except Exception as e:
219                logger.error(f"Error processing row {row_num}: {str(e)}")
220            finally:
221                row_num += 1
222
223        # Yield remaining results
224        if batch:
225            logger.info(f"Processed final batch of {len(batch)} phones (total: {total_processed})")
226            yield batch
227
228        logger.info(f"Stream processing completed. Total processed: {total_processed}")
229
230    def generate_results_csv_stream(
231        self,
232        original_lines: Iterable[str],
233        results_stream: Iterator[list[PhoneVerification]],
234        phone_column: str = "phone",
235    ) -> Iterator[str]:
236        """
237        Generate CSV results as a stream, line by line.
238        Memory-efficient for large files.
239
240        Args:
241            original_lines: Iterator of original CSV lines
242            results_stream: Iterator of batched PhoneVerification results
243            phone_column: Column name containing phone numbers
244
245        Yields:
246            CSV lines with verification results added
247        """
248        lines_iter = iter(original_lines)
249
250        # Read and yield modified header
251        try:
252            header_line = next(lines_iter).lstrip("\ufeff")
253            reader = csv.DictReader(StringIO(header_line))
254            headers = list(reader.fieldnames or [])
255
256            # Add new columns
257            output_headers = headers + ["line_type", "dnc"]
258            yield ",".join(output_headers) + "\n"
259
260            phone_col = self._find_phone_column(headers, phone_column)
261
262        except StopIteration:
263            return
264
265        # Build results lookup from stream
266        results_map = {}
267        for batch in results_stream:
268            for result in batch:
269                results_map[result.phone_number] = result
270
271        # Reset lines iterator
272        lines_iter = iter(original_lines)
273        next(lines_iter)  # Skip header
274
275        # Process and yield data lines
276        for line in lines_iter:
277            if not line.strip():
278                continue
279
280            row = next(csv.DictReader(StringIO(line), fieldnames=headers))
281            phone = row.get(phone_col, "").strip()
282
283            # Add verification results
284            try:
285                normalized = self.verifier.normalize_phone(phone)
286                if normalized in results_map:
287                    result = results_map[normalized]
288                    row["line_type"] = result.line_type.value
289                    row["dnc"] = "true" if result.dnc else "false"
290                else:
291                    row["line_type"] = "unknown"
292                    row["dnc"] = ""
293            except Exception:
294                row["line_type"] = "invalid"
295                row["dnc"] = ""
296
297            # Write row
298            output = StringIO()
299            writer = csv.DictWriter(output, fieldnames=output_headers)
300            writer.writerow(row)
301            yield output.getvalue()

Process CSV files for bulk phone verification

BulkProcessor(verifier: PhoneVerifier)
21    def __init__(self, verifier: PhoneVerifier):
22        self.verifier = verifier
verifier
def process_csv( self, csv_text: str, phone_column: str = 'phone') -> list[PhoneVerification]:
24    def process_csv(self, csv_text: str, phone_column: str = "phone") -> list[PhoneVerification]:
25        """
26        Process CSV text content.
27        Returns list of verification results.
28        """
29        results = []
30
31        try:
32            # Strip UTF-8 BOM if present (Excel on Windows adds this)
33            csv_text = csv_text.lstrip("\ufeff")
34
35            # Use StringIO to parse CSV text
36            csv_file = StringIO(csv_text)
37            reader = csv.DictReader(csv_file)
38
39            # Find phone column (case-insensitive)
40            headers = reader.fieldnames or []
41            phone_col = self._find_phone_column(headers, phone_column)
42
43            if not phone_col:
44                raise ValueError(f"Phone column '{phone_column}' not found in CSV")
45
46            logger.info(f"Starting CSV processing using phone column '{phone_col}'")
47
48            for row_num, row in enumerate(reader, start=2):  # Start at 2 (header is 1)
49                try:
50                    phone = row.get(phone_col, "").strip()
51                    if not phone:
52                        logger.warning(f"Empty phone at row {row_num}")
53                        continue
54
55                    # Verify phone
56                    result = self.verifier.verify(phone)
57                    results.append(result)
58
59                    # Log progress every 100 rows
60                    if len(results) % 100 == 0:
61                        logger.info(f"Processed {len(results)} phones (at row {row_num})")
62
63                except ValueError as e:
64                    logger.warning(f"Invalid phone at row {row_num}: {str(e)}")
65                    continue
66                except Exception as e:
67                    logger.error(f"Error processing row {row_num}: {str(e)}")
68                    continue
69
70            logger.info(f"Completed processing {len(results)} valid phones")
71
72        except Exception as e:
73            logger.error(f"CSV processing failed: {str(e)}")
74            raise
75
76        return results

Process CSV text content. Returns list of verification results.

def generate_results_csv( self, original_csv_text: str, results: list[PhoneVerification]) -> str:
107    def generate_results_csv(self, original_csv_text: str, results: list[PhoneVerification]) -> str:
108        """
109        Generate CSV with original data plus verification results.
110        Adds columns: line_type, dnc
111        Returns CSV text string.
112        """
113        # Create lookup dict
114        results_map = {r.phone_number: r for r in results}
115
116        # Parse original CSV (strip UTF-8 BOM if present)
117        original_csv_text = original_csv_text.lstrip("\ufeff")
118        input_file = StringIO(original_csv_text)
119        reader = csv.DictReader(input_file)
120        headers = list(reader.fieldnames or [])
121
122        # Add new columns
123        output_headers = headers + ["line_type", "dnc"]
124
125        # Create output CSV in memory
126        output = StringIO()
127        writer = csv.DictWriter(output, fieldnames=output_headers)
128        writer.writeheader()
129
130        phone_col = self._find_phone_column(headers, "phone")
131
132        for row in reader:
133            phone = row.get(phone_col, "").strip()
134
135            # Try to normalize for lookup
136            try:
137                normalized = self.verifier.normalize_phone(phone)
138                if normalized in results_map:
139                    result = results_map[normalized]
140                    row["line_type"] = result.line_type.value
141                    row["dnc"] = "true" if result.dnc else "false"
142                else:
143                    row["line_type"] = "unknown"
144                    row["dnc"] = ""
145            except Exception:
146                row["line_type"] = "invalid"
147                row["dnc"] = ""
148
149            writer.writerow(row)
150
151        # Return CSV text
152        return output.getvalue()

Generate CSV with original data plus verification results. Adds columns: line_type, dnc Returns CSV text string.

def process_csv_stream( self, lines: Iterable[str], phone_column: str = 'phone', batch_size: int = 100) -> Iterator[list[PhoneVerification]]:
154    def process_csv_stream(
155        self, lines: Iterable[str], phone_column: str = "phone", batch_size: int = 100
156    ) -> Iterator[list[PhoneVerification]]:
157        """
158        Process CSV lines as a stream, yielding batches of results.
159        Memory-efficient for large files.
160
161        Args:
162            lines: Iterator of CSV lines (including header)
163            phone_column: Column name containing phone numbers
164            batch_size: Number of results to accumulate before yielding
165
166        Yields:
167            Batches of PhoneVerification results
168        """
169        lines_list = list(lines)  # Need to iterate twice - once for headers, once for data
170
171        if not lines_list:
172            logger.error("Empty CSV stream")
173            return
174
175        # Parse header (strip UTF-8 BOM if present)
176        header_line = lines_list[0].lstrip("\ufeff")
177        reader = csv.DictReader(StringIO(header_line))
178        headers = reader.fieldnames or []
179        phone_col = self._find_phone_column(headers, phone_column)
180
181        if not phone_col:
182            raise ValueError(f"Phone column '{phone_column}' not found in CSV")
183
184        batch = []
185        row_num = 2  # Start at 2 (header is 1)
186        total_processed = 0
187
188        # Process data lines
189        for line in lines_list[1:]:
190            if not line.strip():
191                continue
192
193            try:
194                # Parse single line
195                row = next(csv.DictReader(StringIO(line), fieldnames=headers))
196                phone = row.get(phone_col, "").strip()
197
198                if not phone:
199                    logger.warning(f"Empty phone at row {row_num}")
200                    row_num += 1
201                    continue
202
203                # Verify phone
204                result = self.verifier.verify(phone)
205                batch.append(result)
206                total_processed += 1
207
208                # Yield batch when full
209                if len(batch) >= batch_size:
210                    logger.info(
211                        f"Processed batch of {len(batch)} phones (total: {total_processed}, at row {row_num})"
212                    )
213                    yield batch
214                    batch = []
215
216            except ValueError as e:
217                logger.warning(f"Invalid phone at row {row_num}: {str(e)}")
218            except Exception as e:
219                logger.error(f"Error processing row {row_num}: {str(e)}")
220            finally:
221                row_num += 1
222
223        # Yield remaining results
224        if batch:
225            logger.info(f"Processed final batch of {len(batch)} phones (total: {total_processed})")
226            yield batch
227
228        logger.info(f"Stream processing completed. Total processed: {total_processed}")

Process CSV lines as a stream, yielding batches of results. Memory-efficient for large files.

Args: lines: Iterator of CSV lines (including header) phone_column: Column name containing phone numbers batch_size: Number of results to accumulate before yielding

Yields: Batches of PhoneVerification results

def generate_results_csv_stream( self, original_lines: Iterable[str], results_stream: Iterator[list[PhoneVerification]], phone_column: str = 'phone') -> Iterator[str]:
230    def generate_results_csv_stream(
231        self,
232        original_lines: Iterable[str],
233        results_stream: Iterator[list[PhoneVerification]],
234        phone_column: str = "phone",
235    ) -> Iterator[str]:
236        """
237        Generate CSV results as a stream, line by line.
238        Memory-efficient for large files.
239
240        Args:
241            original_lines: Iterator of original CSV lines
242            results_stream: Iterator of batched PhoneVerification results
243            phone_column: Column name containing phone numbers
244
245        Yields:
246            CSV lines with verification results added
247        """
248        lines_iter = iter(original_lines)
249
250        # Read and yield modified header
251        try:
252            header_line = next(lines_iter).lstrip("\ufeff")
253            reader = csv.DictReader(StringIO(header_line))
254            headers = list(reader.fieldnames or [])
255
256            # Add new columns
257            output_headers = headers + ["line_type", "dnc"]
258            yield ",".join(output_headers) + "\n"
259
260            phone_col = self._find_phone_column(headers, phone_column)
261
262        except StopIteration:
263            return
264
265        # Build results lookup from stream
266        results_map = {}
267        for batch in results_stream:
268            for result in batch:
269                results_map[result.phone_number] = result
270
271        # Reset lines iterator
272        lines_iter = iter(original_lines)
273        next(lines_iter)  # Skip header
274
275        # Process and yield data lines
276        for line in lines_iter:
277            if not line.strip():
278                continue
279
280            row = next(csv.DictReader(StringIO(line), fieldnames=headers))
281            phone = row.get(phone_col, "").strip()
282
283            # Add verification results
284            try:
285                normalized = self.verifier.normalize_phone(phone)
286                if normalized in results_map:
287                    result = results_map[normalized]
288                    row["line_type"] = result.line_type.value
289                    row["dnc"] = "true" if result.dnc else "false"
290                else:
291                    row["line_type"] = "unknown"
292                    row["dnc"] = ""
293            except Exception:
294                row["line_type"] = "invalid"
295                row["dnc"] = ""
296
297            # Write row
298            output = StringIO()
299            writer = csv.DictWriter(output, fieldnames=output_headers)
300            writer.writerow(row)
301            yield output.getvalue()

Generate CSV results as a stream, line by line. Memory-efficient for large files.

Args: original_lines: Iterator of original CSV lines results_stream: Iterator of batched PhoneVerification results phone_column: Column name containing phone numbers

Yields: CSV lines with verification results added

class DynamoDBCache:
 17class DynamoDBCache:
 18    """Cache for phone verification results using DynamoDB with TTL"""
 19
 20    def __init__(self, table_name: str, ttl_days: int = 90):
 21        self.table_name = table_name
 22        self.ttl_days = ttl_days
 23        self.dynamodb = boto3.resource("dynamodb")
 24        self.table = self.dynamodb.Table(table_name)
 25
 26    def get(self, phone_number: str) -> PhoneVerification | None:
 27        """Get cached verification result"""
 28        try:
 29            response = self.table.get_item(Key={"phone_number": phone_number})
 30
 31            if "Item" not in response:
 32                logger.info(f"Cache miss for {phone_number[:6]}***")
 33                return None
 34
 35            item: dict[str, Any] = response["Item"]
 36            logger.info(f"Cache hit for {phone_number[:6]}***")
 37
 38            return PhoneVerification(
 39                phone_number=str(item["phone_number"]),
 40                line_type=LineType(str(item["line_type"])),
 41                dnc=bool(item["dnc"]),
 42                cached=True,
 43                verified_at=datetime.fromisoformat(str(item["verified_at"])),
 44                source=VerificationSource.CACHE,
 45            )
 46
 47        except Exception as e:
 48            logger.error(f"Cache get error: {str(e)}")
 49            return None
 50
 51    def set(self, phone_number: str, verification: PhoneVerification) -> None:
 52        """Store verification result in cache"""
 53        try:
 54            ttl = int((datetime.now(UTC) + timedelta(days=self.ttl_days)).timestamp())
 55
 56            self.table.put_item(
 57                Item={
 58                    "phone_number": phone_number,
 59                    "line_type": verification.line_type.value,
 60                    "dnc": verification.dnc,
 61                    "verified_at": verification.verified_at.isoformat(),
 62                    "source": verification.source.value,
 63                    "ttl": ttl,
 64                }
 65            )
 66
 67            logger.info(f"Cached result for {phone_number[:6]}***")
 68
 69        except Exception as e:
 70            logger.error(f"Cache set error: {str(e)}")
 71            # Don't fail the request if cache write fails
 72
 73    def batch_get(self, phone_numbers: list[str]) -> dict[str, PhoneVerification | None]:
 74        """Get multiple cached results"""
 75        results: dict[str, PhoneVerification | None] = {}
 76
 77        # DynamoDB batch get (max 100 items per request)
 78        for i in range(0, len(phone_numbers), 100):
 79            batch = phone_numbers[i : i + 100]
 80
 81            try:
 82                response = self.dynamodb.batch_get_item(
 83                    RequestItems={
 84                        self.table_name: {"Keys": [{"phone_number": phone} for phone in batch]}
 85                    }
 86                )
 87
 88                for item in response.get("Responses", {}).get(self.table_name, []):
 89                    phone = str(item["phone_number"])
 90                    results[phone] = PhoneVerification(
 91                        phone_number=phone,
 92                        line_type=LineType(str(item["line_type"])),
 93                        dnc=bool(item["dnc"]),
 94                        cached=True,
 95                        verified_at=datetime.fromisoformat(str(item["verified_at"])),
 96                        source=VerificationSource.CACHE,
 97                    )
 98
 99            except Exception as e:
100                logger.error(f"Batch cache get error: {str(e)}")
101
102        # Fill in None for misses
103        for phone in phone_numbers:
104            if phone not in results:
105                results[phone] = None
106
107        return results

Cache for phone verification results using DynamoDB with TTL

DynamoDBCache(table_name: str, ttl_days: int = 90)
20    def __init__(self, table_name: str, ttl_days: int = 90):
21        self.table_name = table_name
22        self.ttl_days = ttl_days
23        self.dynamodb = boto3.resource("dynamodb")
24        self.table = self.dynamodb.Table(table_name)
table_name
ttl_days
dynamodb
table
def get( self, phone_number: str) -> PhoneVerification | None:
26    def get(self, phone_number: str) -> PhoneVerification | None:
27        """Get cached verification result"""
28        try:
29            response = self.table.get_item(Key={"phone_number": phone_number})
30
31            if "Item" not in response:
32                logger.info(f"Cache miss for {phone_number[:6]}***")
33                return None
34
35            item: dict[str, Any] = response["Item"]
36            logger.info(f"Cache hit for {phone_number[:6]}***")
37
38            return PhoneVerification(
39                phone_number=str(item["phone_number"]),
40                line_type=LineType(str(item["line_type"])),
41                dnc=bool(item["dnc"]),
42                cached=True,
43                verified_at=datetime.fromisoformat(str(item["verified_at"])),
44                source=VerificationSource.CACHE,
45            )
46
47        except Exception as e:
48            logger.error(f"Cache get error: {str(e)}")
49            return None

Get cached verification result

def set( self, phone_number: str, verification: PhoneVerification) -> None:
51    def set(self, phone_number: str, verification: PhoneVerification) -> None:
52        """Store verification result in cache"""
53        try:
54            ttl = int((datetime.now(UTC) + timedelta(days=self.ttl_days)).timestamp())
55
56            self.table.put_item(
57                Item={
58                    "phone_number": phone_number,
59                    "line_type": verification.line_type.value,
60                    "dnc": verification.dnc,
61                    "verified_at": verification.verified_at.isoformat(),
62                    "source": verification.source.value,
63                    "ttl": ttl,
64                }
65            )
66
67            logger.info(f"Cached result for {phone_number[:6]}***")
68
69        except Exception as e:
70            logger.error(f"Cache set error: {str(e)}")
71            # Don't fail the request if cache write fails

Store verification result in cache

def batch_get( self, phone_numbers: list[str]) -> dict[str, PhoneVerification | None]:
 73    def batch_get(self, phone_numbers: list[str]) -> dict[str, PhoneVerification | None]:
 74        """Get multiple cached results"""
 75        results: dict[str, PhoneVerification | None] = {}
 76
 77        # DynamoDB batch get (max 100 items per request)
 78        for i in range(0, len(phone_numbers), 100):
 79            batch = phone_numbers[i : i + 100]
 80
 81            try:
 82                response = self.dynamodb.batch_get_item(
 83                    RequestItems={
 84                        self.table_name: {"Keys": [{"phone_number": phone} for phone in batch]}
 85                    }
 86                )
 87
 88                for item in response.get("Responses", {}).get(self.table_name, []):
 89                    phone = str(item["phone_number"])
 90                    results[phone] = PhoneVerification(
 91                        phone_number=phone,
 92                        line_type=LineType(str(item["line_type"])),
 93                        dnc=bool(item["dnc"]),
 94                        cached=True,
 95                        verified_at=datetime.fromisoformat(str(item["verified_at"])),
 96                        source=VerificationSource.CACHE,
 97                    )
 98
 99            except Exception as e:
100                logger.error(f"Batch cache get error: {str(e)}")
101
102        # Fill in None for misses
103        for phone in phone_numbers:
104            if phone not in results:
105                results[phone] = None
106
107        return results

Get multiple cached results

def compute_key_hash(key: str) -> str:
40def compute_key_hash(key: str) -> str:
41    """Compute the SHA-256 hash of an API key.
42
43    Args:
44        key: The full API key string.
45
46    Returns:
47        Hex digest of the SHA-256 hash.
48    """
49    return hashlib.sha256(key.encode()).hexdigest()

Compute the SHA-256 hash of an API key.

Args: key: The full API key string.

Returns: Hex digest of the SHA-256 hash.

def generate_key_id() -> str:
22def generate_key_id() -> str:
23    """Generate a unique key ID with mk_ prefix.
24
25    Returns a key ID in the format ``mk_<24 hex chars>`` using 96-bit
26    entropy via :func:`secrets.token_hex`.
27    """
28    return f"{KEY_ID_PREFIX}{secrets.token_hex(12)}"

Generate a unique key ID with mk_ prefix.

Returns a key ID in the format mk_<24 hex chars> using 96-bit entropy via secrets.token_hex().

def generate_managed_key() -> str:
31def generate_managed_key() -> str:
32    """Generate a new managed API key.
33
34    Returns a key in the format ``lls_mk_<40 hex chars>`` using 160-bit
35    entropy via :func:`secrets.token_hex`.
36    """
37    return f"{MANAGED_KEY_PREFIX}{secrets.token_hex(20)}"

Generate a new managed API key.

Returns a key in the format lls_mk_<40 hex chars> using 160-bit entropy via secrets.token_hex().

def validate_expiration_days(days: int) -> bool:
52def validate_expiration_days(days: int) -> bool:
53    """Validate that an expiration period is within the allowed range.
54
55    Args:
56        days: Number of days until key expiration.
57
58    Returns:
59        ``True`` if *days* is between 1 and 730 inclusive, ``False`` otherwise.
60    """
61    return MIN_EXPIRATION_DAYS <= days <= MAX_EXPIRATION_DAYS

Validate that an expiration period is within the allowed range.

Args: days: Number of days until key expiration.

Returns: True if days is between 1 and 730 inclusive, False otherwise.

class DecimalEncoder(json.encoder.JSONEncoder):
 8class DecimalEncoder(json.JSONEncoder):
 9    """JSON encoder that handles DynamoDB Decimal types."""
10
11    def default(self, obj: object) -> object:
12        if isinstance(obj, Decimal):
13            return int(obj) if obj % 1 == 0 else float(obj)
14        return super().default(obj)

JSON encoder that handles DynamoDB Decimal types.

def default(self, obj: object) -> object:
11    def default(self, obj: object) -> object:
12        if isinstance(obj, Decimal):
13            return int(obj) if obj % 1 == 0 else float(obj)
14        return super().default(obj)

Implement this method in a subclass such that it returns a serializable object for o, or calls the base implementation (to raise a TypeError).

For example, to support arbitrary iterators, you could implement default like this::

def default(self, o):
    try:
        iterable = iter(o)
    except TypeError:
        pass
    else:
        return list(iterable)
    # Let the base class default method raise the TypeError
    return super().default(o)
def extract_area_code(phone: str) -> str:
17def extract_area_code(phone: str) -> str:
18    """Extract 3-digit area code from a phone number string.
19
20    Handles E.164 format (+1XXXXXXXXXX) and raw digits.
21    Returns 'unknown' if fewer than 3 digits.
22    """
23    digits = "".join(c for c in phone if c.isdigit())
24    if digits.startswith("1") and len(digits) >= 4:
25        return digits[1:4]
26    if len(digits) >= 3:
27        return digits[:3]
28    return "unknown"

Extract 3-digit area code from a phone number string.

Handles E.164 format (+1XXXXXXXXXX) and raw digits. Returns 'unknown' if fewer than 3 digits.

class ManagedApiKeyService:
 46class ManagedApiKeyService:
 47    """Manages user API keys with CRUD operations in DynamoDB.
 48
 49    DynamoDB table schema:
 50        - Hash key: user_id (S)
 51        - Range key: key_id (S)
 52    """
 53
 54    table: "Table | None"
 55
 56    def __init__(self, table_name: str | None = None):
 57        """Initialize with DynamoDB table."""
 58        if not HAS_BOTO3 or not boto3:
 59            raise RuntimeError("boto3 is required for ManagedApiKeyService")
 60
 61        self.dynamodb = boto3.resource("dynamodb")
 62        self.table_name = table_name if table_name else os.environ["MANAGED_API_KEYS_TABLE"]
 63
 64        try:
 65            self.table = self.dynamodb.Table(self.table_name)
 66        except Exception as e:
 67            logger.error(f"Failed to connect to DynamoDB table {self.table_name}: {e}")
 68            self.table = None
 69
 70    def _get_key(self, user_id: str, key_id: str) -> dict[str, Any]:
 71        """Fetch a key item, raising if not found or revoked."""
 72        if not self.table:
 73            raise RuntimeError(f"DynamoDB table {self.table_name} not accessible")
 74
 75        response = self.table.get_item(Key={"user_id": user_id, "key_id": key_id})
 76        item = response.get("Item")
 77        if not item:
 78            raise KeyNotFoundError(f"Key {key_id} not found for user {user_id}")
 79        if item.get("status") == "revoked":
 80            raise RevokedKeyError(f"Key {key_id} is revoked")
 81        return item
 82
 83    def list_keys(self, user_id: str) -> list[dict[str, Any]]:
 84        """List all API keys for a user, sorted by created_at descending.
 85
 86        Returns projected fields only (excludes key_hash).
 87        """
 88        if not self.table:
 89            raise RuntimeError(f"DynamoDB table {self.table_name} not accessible")
 90
 91        response = self.table.query(
 92            KeyConditionExpression="user_id = :uid",
 93            ExpressionAttributeValues={":uid": user_id},
 94        )
 95        items = response.get("Items", [])
 96
 97        result = []
 98        for item in items:
 99            result.append(
100                {
101                    "key_id": item["key_id"],
102                    "key_last4": item.get("key_last4", ""),
103                    "label": item.get("label", ""),
104                    "status": item.get("status", "active"),
105                    "created_at": item.get("created_at", ""),
106                    "expires_at": item.get("expires_at"),
107                    "last_used_at": item.get("last_used_at"),
108                }
109            )
110
111        result.sort(key=lambda x: str(x.get("created_at", "")), reverse=True)
112        return result
113
114    def create_key(self, user_id: str, label: str, expires_in_days: int = 365) -> dict[str, Any]:
115        """Create a new managed API key.
116
117        Returns the key_id and plaintext key (only time key is returned).
118        """
119        if not self.table:
120            raise RuntimeError(f"DynamoDB table {self.table_name} not accessible")
121
122        # Validate label
123        label = label.strip()
124        if not label or len(label) > MAX_LABEL_LENGTH:
125            raise ValueError(f"Label must be 1-{MAX_LABEL_LENGTH} characters, got {len(label)}")
126
127        # Validate expiration
128        if not validate_expiration_days(expires_in_days):
129            raise ValueError(f"Expiration must be 1-730 days, got {expires_in_days}")
130
131        # Check active key count
132        existing = self.list_keys(user_id)
133        active_count = sum(1 for k in existing if k["status"] != "revoked")
134        if active_count >= MAX_ACTIVE_KEYS:
135            raise LimitExceededError(f"Maximum of {MAX_ACTIVE_KEYS} active keys reached")
136
137        # Generate key
138        key_id = generate_key_id()
139        plaintext_key = generate_managed_key()
140        key_hash = compute_key_hash(plaintext_key)
141        now = datetime.now(UTC).isoformat()
142        expires_at = (datetime.now(UTC) + timedelta(days=expires_in_days)).isoformat()
143
144        self.table.put_item(
145            Item={
146                "user_id": user_id,
147                "key_id": key_id,
148                "key_hash": key_hash,
149                "key_last4": plaintext_key[-4:],
150                "label": label,
151                "status": "active",
152                "created_at": now,
153                "expires_at": expires_at,
154                "last_used_at": None,
155                "usage_count": 0,
156            }
157        )
158
159        logger.info(f"Created managed key {key_id} for user {user_id}")
160        return {
161            "key_id": key_id,
162            "api_key": plaintext_key,
163            "label": label,
164            "expires_at": expires_at,
165        }
166
167    def update_key(
168        self,
169        user_id: str,
170        key_id: str,
171        label: str | None = None,
172        expires_in_days: int | None = None,
173    ) -> dict[str, Any]:
174        """Update key label and/or expiration."""
175        if label is None and expires_in_days is None:
176            raise ValueError("At least one of label or expires_in_days must be provided")
177
178        # This will raise KeyNotFoundError or RevokedKeyError
179        self._get_key(user_id, key_id)
180
181        update_parts = ["SET updated_at = :now"]
182        expr_values: dict[str, Any] = {":now": datetime.now(UTC).isoformat()}
183
184        if label is not None:
185            label = label.strip()
186            if not label or len(label) > MAX_LABEL_LENGTH:
187                raise ValueError(f"Label must be 1-{MAX_LABEL_LENGTH} characters")
188            update_parts.append("label = :label")
189            expr_values[":label"] = label
190
191        if expires_in_days is not None:
192            if not validate_expiration_days(expires_in_days):
193                raise ValueError(f"Expiration must be 1-730 days, got {expires_in_days}")
194            expires_at = (datetime.now(UTC) + timedelta(days=expires_in_days)).isoformat()
195            update_parts.append("expires_at = :expires_at")
196            expr_values[":expires_at"] = expires_at
197
198        update_expr = update_parts[0]
199        if len(update_parts) > 1:
200            update_expr += ", " + ", ".join(update_parts[1:])
201
202        if not self.table:
203            raise RuntimeError(f"DynamoDB table {self.table_name} not accessible")
204
205        self.table.update_item(
206            Key={"user_id": user_id, "key_id": key_id},
207            UpdateExpression=update_expr,
208            ExpressionAttributeValues=expr_values,
209        )
210
211        logger.info(f"Updated managed key {key_id} for user {user_id}")
212        return {"message": "Key updated"}
213
214    def rotate_key(self, user_id: str, key_id: str) -> dict[str, Any]:
215        """Generate a new key value while keeping the same key_id.
216
217        Returns the new plaintext key (only time it's returned).
218        """
219        # This will raise KeyNotFoundError or RevokedKeyError
220        self._get_key(user_id, key_id)
221
222        plaintext_key = generate_managed_key()
223        key_hash = compute_key_hash(plaintext_key)
224        now = datetime.now(UTC).isoformat()
225
226        if not self.table:
227            raise RuntimeError(f"DynamoDB table {self.table_name} not accessible")
228
229        self.table.update_item(
230            Key={"user_id": user_id, "key_id": key_id},
231            UpdateExpression=("SET key_hash = :hash, key_last4 = :last4, updated_at = :now"),
232            ExpressionAttributeValues={
233                ":hash": key_hash,
234                ":last4": plaintext_key[-4:],
235                ":now": now,
236            },
237        )
238
239        logger.info(f"Rotated managed key {key_id} for user {user_id}")
240        return {
241            "key_id": key_id,
242            "api_key": plaintext_key,
243            "label": "",
244            "expires_at": "",
245        }
246
247    def revoke_key(self, user_id: str, key_id: str) -> None:
248        """Mark a key as revoked with TTL for automatic cleanup."""
249        if not self.table:
250            raise RuntimeError(f"DynamoDB table {self.table_name} not accessible")
251
252        # Check key exists (but allow revoking already-revoked keys)
253        response = self.table.get_item(Key={"user_id": user_id, "key_id": key_id})
254        if not response.get("Item"):
255            raise KeyNotFoundError(f"Key {key_id} not found for user {user_id}")
256
257        now = datetime.now(UTC)
258        ttl = int((now + timedelta(days=REVOKE_TTL_DAYS)).timestamp())
259
260        self.table.update_item(
261            Key={"user_id": user_id, "key_id": key_id},
262            UpdateExpression=("SET #s = :revoked, revoked_at = :now, #ttl = :ttl"),
263            ExpressionAttributeNames={"#s": "status", "#ttl": "ttl"},
264            ExpressionAttributeValues={
265                ":revoked": "revoked",
266                ":now": now.isoformat(),
267                ":ttl": ttl,
268            },
269        )
270
271        logger.info(f"Revoked managed key {key_id} for user {user_id}")

Manages user API keys with CRUD operations in DynamoDB.

DynamoDB table schema: - Hash key: user_id (S) - Range key: key_id (S)

ManagedApiKeyService(table_name: str | None = None)
56    def __init__(self, table_name: str | None = None):
57        """Initialize with DynamoDB table."""
58        if not HAS_BOTO3 or not boto3:
59            raise RuntimeError("boto3 is required for ManagedApiKeyService")
60
61        self.dynamodb = boto3.resource("dynamodb")
62        self.table_name = table_name if table_name else os.environ["MANAGED_API_KEYS_TABLE"]
63
64        try:
65            self.table = self.dynamodb.Table(self.table_name)
66        except Exception as e:
67            logger.error(f"Failed to connect to DynamoDB table {self.table_name}: {e}")
68            self.table = None

Initialize with DynamoDB table.

table: mypy_boto3_dynamodb.service_resource.Table | None
dynamodb
table_name
def list_keys(self, user_id: str) -> list[dict[str, typing.Any]]:
 83    def list_keys(self, user_id: str) -> list[dict[str, Any]]:
 84        """List all API keys for a user, sorted by created_at descending.
 85
 86        Returns projected fields only (excludes key_hash).
 87        """
 88        if not self.table:
 89            raise RuntimeError(f"DynamoDB table {self.table_name} not accessible")
 90
 91        response = self.table.query(
 92            KeyConditionExpression="user_id = :uid",
 93            ExpressionAttributeValues={":uid": user_id},
 94        )
 95        items = response.get("Items", [])
 96
 97        result = []
 98        for item in items:
 99            result.append(
100                {
101                    "key_id": item["key_id"],
102                    "key_last4": item.get("key_last4", ""),
103                    "label": item.get("label", ""),
104                    "status": item.get("status", "active"),
105                    "created_at": item.get("created_at", ""),
106                    "expires_at": item.get("expires_at"),
107                    "last_used_at": item.get("last_used_at"),
108                }
109            )
110
111        result.sort(key=lambda x: str(x.get("created_at", "")), reverse=True)
112        return result

List all API keys for a user, sorted by created_at descending.

Returns projected fields only (excludes key_hash).

def create_key( self, user_id: str, label: str, expires_in_days: int = 365) -> dict[str, typing.Any]:
114    def create_key(self, user_id: str, label: str, expires_in_days: int = 365) -> dict[str, Any]:
115        """Create a new managed API key.
116
117        Returns the key_id and plaintext key (only time key is returned).
118        """
119        if not self.table:
120            raise RuntimeError(f"DynamoDB table {self.table_name} not accessible")
121
122        # Validate label
123        label = label.strip()
124        if not label or len(label) > MAX_LABEL_LENGTH:
125            raise ValueError(f"Label must be 1-{MAX_LABEL_LENGTH} characters, got {len(label)}")
126
127        # Validate expiration
128        if not validate_expiration_days(expires_in_days):
129            raise ValueError(f"Expiration must be 1-730 days, got {expires_in_days}")
130
131        # Check active key count
132        existing = self.list_keys(user_id)
133        active_count = sum(1 for k in existing if k["status"] != "revoked")
134        if active_count >= MAX_ACTIVE_KEYS:
135            raise LimitExceededError(f"Maximum of {MAX_ACTIVE_KEYS} active keys reached")
136
137        # Generate key
138        key_id = generate_key_id()
139        plaintext_key = generate_managed_key()
140        key_hash = compute_key_hash(plaintext_key)
141        now = datetime.now(UTC).isoformat()
142        expires_at = (datetime.now(UTC) + timedelta(days=expires_in_days)).isoformat()
143
144        self.table.put_item(
145            Item={
146                "user_id": user_id,
147                "key_id": key_id,
148                "key_hash": key_hash,
149                "key_last4": plaintext_key[-4:],
150                "label": label,
151                "status": "active",
152                "created_at": now,
153                "expires_at": expires_at,
154                "last_used_at": None,
155                "usage_count": 0,
156            }
157        )
158
159        logger.info(f"Created managed key {key_id} for user {user_id}")
160        return {
161            "key_id": key_id,
162            "api_key": plaintext_key,
163            "label": label,
164            "expires_at": expires_at,
165        }

Create a new managed API key.

Returns the key_id and plaintext key (only time key is returned).

def update_key( self, user_id: str, key_id: str, label: str | None = None, expires_in_days: int | None = None) -> dict[str, typing.Any]:
167    def update_key(
168        self,
169        user_id: str,
170        key_id: str,
171        label: str | None = None,
172        expires_in_days: int | None = None,
173    ) -> dict[str, Any]:
174        """Update key label and/or expiration."""
175        if label is None and expires_in_days is None:
176            raise ValueError("At least one of label or expires_in_days must be provided")
177
178        # This will raise KeyNotFoundError or RevokedKeyError
179        self._get_key(user_id, key_id)
180
181        update_parts = ["SET updated_at = :now"]
182        expr_values: dict[str, Any] = {":now": datetime.now(UTC).isoformat()}
183
184        if label is not None:
185            label = label.strip()
186            if not label or len(label) > MAX_LABEL_LENGTH:
187                raise ValueError(f"Label must be 1-{MAX_LABEL_LENGTH} characters")
188            update_parts.append("label = :label")
189            expr_values[":label"] = label
190
191        if expires_in_days is not None:
192            if not validate_expiration_days(expires_in_days):
193                raise ValueError(f"Expiration must be 1-730 days, got {expires_in_days}")
194            expires_at = (datetime.now(UTC) + timedelta(days=expires_in_days)).isoformat()
195            update_parts.append("expires_at = :expires_at")
196            expr_values[":expires_at"] = expires_at
197
198        update_expr = update_parts[0]
199        if len(update_parts) > 1:
200            update_expr += ", " + ", ".join(update_parts[1:])
201
202        if not self.table:
203            raise RuntimeError(f"DynamoDB table {self.table_name} not accessible")
204
205        self.table.update_item(
206            Key={"user_id": user_id, "key_id": key_id},
207            UpdateExpression=update_expr,
208            ExpressionAttributeValues=expr_values,
209        )
210
211        logger.info(f"Updated managed key {key_id} for user {user_id}")
212        return {"message": "Key updated"}

Update key label and/or expiration.

def rotate_key(self, user_id: str, key_id: str) -> dict[str, typing.Any]:
214    def rotate_key(self, user_id: str, key_id: str) -> dict[str, Any]:
215        """Generate a new key value while keeping the same key_id.
216
217        Returns the new plaintext key (only time it's returned).
218        """
219        # This will raise KeyNotFoundError or RevokedKeyError
220        self._get_key(user_id, key_id)
221
222        plaintext_key = generate_managed_key()
223        key_hash = compute_key_hash(plaintext_key)
224        now = datetime.now(UTC).isoformat()
225
226        if not self.table:
227            raise RuntimeError(f"DynamoDB table {self.table_name} not accessible")
228
229        self.table.update_item(
230            Key={"user_id": user_id, "key_id": key_id},
231            UpdateExpression=("SET key_hash = :hash, key_last4 = :last4, updated_at = :now"),
232            ExpressionAttributeValues={
233                ":hash": key_hash,
234                ":last4": plaintext_key[-4:],
235                ":now": now,
236            },
237        )
238
239        logger.info(f"Rotated managed key {key_id} for user {user_id}")
240        return {
241            "key_id": key_id,
242            "api_key": plaintext_key,
243            "label": "",
244            "expires_at": "",
245        }

Generate a new key value while keeping the same key_id.

Returns the new plaintext key (only time it's returned).

def revoke_key(self, user_id: str, key_id: str) -> None:
247    def revoke_key(self, user_id: str, key_id: str) -> None:
248        """Mark a key as revoked with TTL for automatic cleanup."""
249        if not self.table:
250            raise RuntimeError(f"DynamoDB table {self.table_name} not accessible")
251
252        # Check key exists (but allow revoking already-revoked keys)
253        response = self.table.get_item(Key={"user_id": user_id, "key_id": key_id})
254        if not response.get("Item"):
255            raise KeyNotFoundError(f"Key {key_id} not found for user {user_id}")
256
257        now = datetime.now(UTC)
258        ttl = int((now + timedelta(days=REVOKE_TTL_DAYS)).timestamp())
259
260        self.table.update_item(
261            Key={"user_id": user_id, "key_id": key_id},
262            UpdateExpression=("SET #s = :revoked, revoked_at = :now, #ttl = :ttl"),
263            ExpressionAttributeNames={"#s": "status", "#ttl": "ttl"},
264            ExpressionAttributeValues={
265                ":revoked": "revoked",
266                ":now": now.isoformat(),
267                ":ttl": ttl,
268            },
269        )
270
271        logger.info(f"Revoked managed key {key_id} for user {user_id}")

Mark a key as revoked with TTL for automatic cleanup.

class KeyNotFoundError(builtins.ValueError):
34class KeyNotFoundError(ValueError):
35    """Raised when a managed API key is not found."""

Raised when a managed API key is not found.

class RevokedKeyError(builtins.ValueError):
38class RevokedKeyError(ValueError):
39    """Raised when attempting to modify a revoked key."""

Raised when attempting to modify a revoked key.

class LimitExceededError(builtins.ValueError):
42class LimitExceededError(ValueError):
43    """Raised when the active key limit is reached."""

Raised when the active key limit is reached.

class ProviderError(builtins.ValueError):
 5class ProviderError(ValueError):
 6    """Raised when an upstream verification provider fails.
 7
 8    Extends ValueError for backward compatibility with existing
 9    error handlers. Carries an optional HTTP status code from the
10    upstream response so callers can distinguish client errors from
11    server/network failures.
12    """
13
14    def __init__(self, message: str, status_code: int | None = None):
15        super().__init__(message)
16        self.status_code = status_code

Raised when an upstream verification provider fails.

Extends ValueError for backward compatibility with existing error handlers. Carries an optional HTTP status code from the upstream response so callers can distinguish client errors from server/network failures.

ProviderError(message: str, status_code: int | None = None)
14    def __init__(self, message: str, status_code: int | None = None):
15        super().__init__(message)
16        self.status_code = status_code
status_code
class FileService:
 88class FileService:
 89    """Manages file metadata listing with filtering and pagination.
 90
 91    DynamoDB table schema:
 92        - Hash key: file_id (S)
 93        - GSI: user-index on user_id (HASH) / created_at (RANGE)
 94    """
 95
 96    table: "Table | None"
 97
 98    def __init__(self, table_name: str | None = None):
 99        """Initialize with DynamoDB table."""
100        if not HAS_BOTO3 or not boto3:
101            raise RuntimeError("boto3 is required for FileService")
102
103        self.dynamodb = boto3.resource("dynamodb")
104        self.table_name = table_name if table_name else os.environ["FILES_TABLE"]
105
106        try:
107            self.table = self.dynamodb.Table(self.table_name)
108        except Exception as e:
109            logger.error(f"Failed to connect to DynamoDB table {self.table_name}: {e}")
110            self.table = None
111
112    def _query_all_user_files(self, user_id: str) -> list[dict[str, Any]]:
113        """Query all files for a user via the user-index GSI."""
114        if not self.table:
115            raise RuntimeError(f"DynamoDB table {self.table_name} not accessible")
116
117        items: list[dict[str, Any]] = []
118        kwargs: dict[str, Any] = {
119            "IndexName": "user-index",
120            "KeyConditionExpression": "user_id = :uid",
121            "ExpressionAttributeValues": {":uid": user_id},
122        }
123
124        while True:
125            response = self.table.query(**kwargs)
126            items.extend(response.get("Items", []))
127            last_key = response.get("LastEvaluatedKey")
128            if not last_key:
129                break
130            kwargs["ExclusiveStartKey"] = last_key
131
132        return items
133
134    def list_user_files(
135        self,
136        user_id: str,
137        page: int = 1,
138        limit: int = 20,
139        search: str = "",
140        sort_by: str = "created_at",
141        sort_order: str = "desc",
142        status_filter: str = "",
143    ) -> dict[str, Any]:
144        """List files for a user with filtering, sorting, and pagination.
145
146        Args:
147            user_id: User identifier.
148            page: Page number (1-indexed).
149            limit: Items per page.
150            search: Case-insensitive substring match on filename.
151            sort_by: Column to sort by (from ALLOWED_SORT_COLUMNS).
152            sort_order: 'asc' or 'desc'.
153            status_filter: Filter by file status.
154
155        Returns:
156            Dict with files list and pagination metadata.
157        """
158        items = self._query_all_user_files(user_id)
159
160        # Apply status filter
161        if status_filter and status_filter in ALLOWED_STATUSES:
162            items = [i for i in items if i.get("status") == status_filter]
163
164        # Apply search filter
165        if search:
166            search_lower = search.lower()
167            items = [i for i in items if search_lower in _get_filename(i).lower()]
168
169        # Sort
170        sort_key = SORT_KEY_MAP.get(sort_by, SORT_KEY_MAP["created_at"])
171        reverse = sort_order != "asc"
172        items.sort(key=sort_key, reverse=reverse)
173
174        # Pagination
175        total_count = len(items)
176        total_pages = max(1, math.ceil(total_count / limit))
177        start = (page - 1) * limit
178        end = start + limit
179        page_items = items[start:end]
180
181        return {
182            "files": [_map_file_fields(item) for item in page_items],
183            "page": page,
184            "per_page": limit,
185            "has_more": end < total_count,
186            "total_pages": total_pages,
187            "total_files_count": total_count,
188        }
189
190    def list_user_files_admin(
191        self,
192        user_id: str,
193        page: int = 1,
194        limit: int = 20,
195    ) -> dict[str, Any]:
196        """Admin view of user files with raw S3 keys.
197
198        Returns raw S3 keys for the handler to generate presigned URLs.
199        """
200        items = self._query_all_user_files(user_id)
201
202        # Sort newest first
203        items.sort(key=lambda i: i.get("created_at", ""), reverse=True)
204
205        total = len(items)
206        start = (page - 1) * limit
207        end = start + limit
208        page_items = items[start:end]
209
210        return {
211            "files": [_map_admin_fields(item) for item in page_items],
212            "total": total,
213            "page": page,
214            "limit": limit,
215        }

Manages file metadata listing with filtering and pagination.

DynamoDB table schema: - Hash key: file_id (S) - GSI: user-index on user_id (HASH) / created_at (RANGE)

FileService(table_name: str | None = None)
 98    def __init__(self, table_name: str | None = None):
 99        """Initialize with DynamoDB table."""
100        if not HAS_BOTO3 or not boto3:
101            raise RuntimeError("boto3 is required for FileService")
102
103        self.dynamodb = boto3.resource("dynamodb")
104        self.table_name = table_name if table_name else os.environ["FILES_TABLE"]
105
106        try:
107            self.table = self.dynamodb.Table(self.table_name)
108        except Exception as e:
109            logger.error(f"Failed to connect to DynamoDB table {self.table_name}: {e}")
110            self.table = None

Initialize with DynamoDB table.

table: mypy_boto3_dynamodb.service_resource.Table | None
dynamodb
table_name
def list_user_files( self, user_id: str, page: int = 1, limit: int = 20, search: str = '', sort_by: str = 'created_at', sort_order: str = 'desc', status_filter: str = '') -> dict[str, typing.Any]:
134    def list_user_files(
135        self,
136        user_id: str,
137        page: int = 1,
138        limit: int = 20,
139        search: str = "",
140        sort_by: str = "created_at",
141        sort_order: str = "desc",
142        status_filter: str = "",
143    ) -> dict[str, Any]:
144        """List files for a user with filtering, sorting, and pagination.
145
146        Args:
147            user_id: User identifier.
148            page: Page number (1-indexed).
149            limit: Items per page.
150            search: Case-insensitive substring match on filename.
151            sort_by: Column to sort by (from ALLOWED_SORT_COLUMNS).
152            sort_order: 'asc' or 'desc'.
153            status_filter: Filter by file status.
154
155        Returns:
156            Dict with files list and pagination metadata.
157        """
158        items = self._query_all_user_files(user_id)
159
160        # Apply status filter
161        if status_filter and status_filter in ALLOWED_STATUSES:
162            items = [i for i in items if i.get("status") == status_filter]
163
164        # Apply search filter
165        if search:
166            search_lower = search.lower()
167            items = [i for i in items if search_lower in _get_filename(i).lower()]
168
169        # Sort
170        sort_key = SORT_KEY_MAP.get(sort_by, SORT_KEY_MAP["created_at"])
171        reverse = sort_order != "asc"
172        items.sort(key=sort_key, reverse=reverse)
173
174        # Pagination
175        total_count = len(items)
176        total_pages = max(1, math.ceil(total_count / limit))
177        start = (page - 1) * limit
178        end = start + limit
179        page_items = items[start:end]
180
181        return {
182            "files": [_map_file_fields(item) for item in page_items],
183            "page": page,
184            "per_page": limit,
185            "has_more": end < total_count,
186            "total_pages": total_pages,
187            "total_files_count": total_count,
188        }

List files for a user with filtering, sorting, and pagination.

Args: user_id: User identifier. page: Page number (1-indexed). limit: Items per page. search: Case-insensitive substring match on filename. sort_by: Column to sort by (from ALLOWED_SORT_COLUMNS). sort_order: 'asc' or 'desc'. status_filter: Filter by file status.

Returns: Dict with files list and pagination metadata.

def list_user_files_admin( self, user_id: str, page: int = 1, limit: int = 20) -> dict[str, typing.Any]:
190    def list_user_files_admin(
191        self,
192        user_id: str,
193        page: int = 1,
194        limit: int = 20,
195    ) -> dict[str, Any]:
196        """Admin view of user files with raw S3 keys.
197
198        Returns raw S3 keys for the handler to generate presigned URLs.
199        """
200        items = self._query_all_user_files(user_id)
201
202        # Sort newest first
203        items.sort(key=lambda i: i.get("created_at", ""), reverse=True)
204
205        total = len(items)
206        start = (page - 1) * limit
207        end = start + limit
208        page_items = items[start:end]
209
210        return {
211            "files": [_map_admin_fields(item) for item in page_items],
212            "total": total,
213            "page": page,
214            "limit": limit,
215        }

Admin view of user files with raw S3 keys.

Returns raw S3 keys for the handler to generate presigned URLs.