Coverage for src / ai_lls_lib / core / processor.py: 96%
157 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-06 23:45 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-06 23:45 +0000
1"""
2Bulk CSV processing for phone verification
3"""
5import csv
6from collections.abc import Iterable, Iterator, Sequence
7from io import StringIO
9from aws_lambda_powertools import Logger
11from .models import PhoneVerification
12from .verifier import PhoneVerifier
14logger = Logger()
17class BulkProcessor:
18 """Process CSV files for bulk phone verification"""
20 def __init__(self, verifier: PhoneVerifier):
21 self.verifier = verifier
23 def process_csv(self, csv_text: str, phone_column: str = "phone") -> list[PhoneVerification]:
24 """
25 Process CSV text content.
26 Returns list of verification results.
27 """
28 results = []
30 try:
31 # Strip UTF-8 BOM if present (Excel on Windows adds this)
32 csv_text = csv_text.lstrip("\ufeff")
34 # Use StringIO to parse CSV text
35 csv_file = StringIO(csv_text)
36 reader = csv.DictReader(csv_file)
38 # Find phone column (case-insensitive)
39 headers = reader.fieldnames or []
40 phone_col = self._find_phone_column(headers, phone_column)
42 if not phone_col:
43 raise ValueError(f"Phone column '{phone_column}' not found in CSV")
45 logger.info(f"Starting CSV processing using phone column '{phone_col}'")
47 for row_num, row in enumerate(reader, start=2): # Start at 2 (header is 1)
48 try:
49 phone = row.get(phone_col, "").strip()
50 if not phone:
51 logger.warning(f"Empty phone at row {row_num}")
52 continue
54 # Verify phone
55 result = self.verifier.verify(phone)
56 results.append(result)
58 # Log progress every 100 rows
59 if len(results) % 100 == 0:
60 logger.info(f"Processed {len(results)} phones (at row {row_num})")
62 except ValueError as e:
63 logger.warning(f"Invalid phone at row {row_num}: {str(e)}")
64 continue
65 except Exception as e:
66 logger.error(f"Error processing row {row_num}: {str(e)}")
67 continue
69 logger.info(f"Completed processing {len(results)} valid phones")
71 except Exception as e:
72 logger.error(f"CSV processing failed: {str(e)}")
73 raise
75 return results
77 def _find_phone_column(self, headers: list[str] | Sequence[str], preferred: str) -> str | None:
78 """Find phone column in headers (case-insensitive)"""
79 # First try exact match
80 for header in headers:
81 if header.lower() == preferred.lower():
82 return header
84 # Common phone column names
85 phone_patterns = [
86 "phone",
87 "phone_number",
88 "phonenumber",
89 "mobile",
90 "cell",
91 "telephone",
92 "tel",
93 "number",
94 "contact",
95 ]
97 for header in headers:
98 header_lower = header.lower()
99 for pattern in phone_patterns:
100 if pattern in header_lower:
101 logger.info(f"Using column '{header}' as phone column")
102 return header
104 return None
106 def generate_results_csv(self, original_csv_text: str, results: list[PhoneVerification]) -> str:
107 """
108 Generate CSV with original data plus verification results.
109 Adds columns: line_type, dnc
110 Returns CSV text string.
111 """
112 # Create lookup dict
113 results_map = {r.phone_number: r for r in results}
115 # Parse original CSV (strip UTF-8 BOM if present)
116 original_csv_text = original_csv_text.lstrip("\ufeff")
117 input_file = StringIO(original_csv_text)
118 reader = csv.DictReader(input_file)
119 headers = list(reader.fieldnames or [])
121 # Add new columns
122 output_headers = headers + ["line_type", "dnc"]
124 # Create output CSV in memory
125 output = StringIO()
126 writer = csv.DictWriter(output, fieldnames=output_headers)
127 writer.writeheader()
129 phone_col = self._find_phone_column(headers, "phone")
131 for row in reader:
132 phone = row.get(phone_col, "").strip()
134 # Try to normalize for lookup
135 try:
136 normalized = self.verifier.normalize_phone(phone)
137 if normalized in results_map:
138 result = results_map[normalized]
139 row["line_type"] = result.line_type.value
140 row["dnc"] = "true" if result.dnc else "false"
141 else:
142 row["line_type"] = "unknown"
143 row["dnc"] = ""
144 except Exception:
145 row["line_type"] = "invalid"
146 row["dnc"] = ""
148 writer.writerow(row)
150 # Return CSV text
151 return output.getvalue()
153 def process_csv_stream(
154 self, lines: Iterable[str], phone_column: str = "phone", batch_size: int = 100
155 ) -> Iterator[list[PhoneVerification]]:
156 """
157 Process CSV lines as a stream, yielding batches of results.
158 Memory-efficient for large files.
160 Args:
161 lines: Iterator of CSV lines (including header)
162 phone_column: Column name containing phone numbers
163 batch_size: Number of results to accumulate before yielding
165 Yields:
166 Batches of PhoneVerification results
167 """
168 lines_list = list(lines) # Need to iterate twice - once for headers, once for data
170 if not lines_list:
171 logger.error("Empty CSV stream")
172 return
174 # Parse header (strip UTF-8 BOM if present)
175 header_line = lines_list[0].lstrip("\ufeff")
176 reader = csv.DictReader(StringIO(header_line))
177 headers = reader.fieldnames or []
178 phone_col = self._find_phone_column(headers, phone_column)
180 if not phone_col:
181 raise ValueError(f"Phone column '{phone_column}' not found in CSV")
183 batch = []
184 row_num = 2 # Start at 2 (header is 1)
185 total_processed = 0
187 # Process data lines
188 for line in lines_list[1:]:
189 if not line.strip():
190 continue
192 try:
193 # Parse single line
194 row = next(csv.DictReader(StringIO(line), fieldnames=headers))
195 phone = row.get(phone_col, "").strip()
197 if not phone:
198 logger.warning(f"Empty phone at row {row_num}")
199 row_num += 1
200 continue
202 # Verify phone
203 result = self.verifier.verify(phone)
204 batch.append(result)
205 total_processed += 1
207 # Yield batch when full
208 if len(batch) >= batch_size:
209 logger.info(
210 f"Processed batch of {len(batch)} phones (total: {total_processed}, at row {row_num})"
211 )
212 yield batch
213 batch = []
215 except ValueError as e:
216 logger.warning(f"Invalid phone at row {row_num}: {str(e)}")
217 except Exception as e:
218 logger.error(f"Error processing row {row_num}: {str(e)}")
219 finally:
220 row_num += 1
222 # Yield remaining results
223 if batch:
224 logger.info(f"Processed final batch of {len(batch)} phones (total: {total_processed})")
225 yield batch
227 logger.info(f"Stream processing completed. Total processed: {total_processed}")
229 def generate_results_csv_stream(
230 self,
231 original_lines: Iterable[str],
232 results_stream: Iterator[list[PhoneVerification]],
233 phone_column: str = "phone",
234 ) -> Iterator[str]:
235 """
236 Generate CSV results as a stream, line by line.
237 Memory-efficient for large files.
239 Args:
240 original_lines: Iterator of original CSV lines
241 results_stream: Iterator of batched PhoneVerification results
242 phone_column: Column name containing phone numbers
244 Yields:
245 CSV lines with verification results added
246 """
247 lines_iter = iter(original_lines)
249 # Read and yield modified header
250 try:
251 header_line = next(lines_iter).lstrip("\ufeff")
252 reader = csv.DictReader(StringIO(header_line))
253 headers = list(reader.fieldnames or [])
255 # Add new columns
256 output_headers = headers + ["line_type", "dnc"]
257 yield ",".join(output_headers) + "\n"
259 phone_col = self._find_phone_column(headers, phone_column)
261 except StopIteration:
262 return
264 # Build results lookup from stream
265 results_map = {}
266 for batch in results_stream:
267 for result in batch:
268 results_map[result.phone_number] = result
270 # Reset lines iterator
271 lines_iter = iter(original_lines)
272 next(lines_iter) # Skip header
274 # Process and yield data lines
275 for line in lines_iter:
276 if not line.strip():
277 continue
279 row = next(csv.DictReader(StringIO(line), fieldnames=headers))
280 phone = row.get(phone_col, "").strip()
282 # Add verification results
283 try:
284 normalized = self.verifier.normalize_phone(phone)
285 if normalized in results_map:
286 result = results_map[normalized]
287 row["line_type"] = result.line_type.value
288 row["dnc"] = "true" if result.dnc else "false"
289 else:
290 row["line_type"] = "unknown"
291 row["dnc"] = ""
292 except Exception:
293 row["line_type"] = "invalid"
294 row["dnc"] = ""
296 # Write row
297 output = StringIO()
298 writer = csv.DictWriter(output, fieldnames=output_headers)
299 writer.writerow(row)
300 yield output.getvalue()