Coverage for src / ai_lls_lib / core / processor.py: 96%

157 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-06 23:45 +0000

1""" 

2Bulk CSV processing for phone verification 

3""" 

4 

5import csv 

6from collections.abc import Iterable, Iterator, Sequence 

7from io import StringIO 

8 

9from aws_lambda_powertools import Logger 

10 

11from .models import PhoneVerification 

12from .verifier import PhoneVerifier 

13 

14logger = Logger() 

15 

16 

17class BulkProcessor: 

18 """Process CSV files for bulk phone verification""" 

19 

20 def __init__(self, verifier: PhoneVerifier): 

21 self.verifier = verifier 

22 

23 def process_csv(self, csv_text: str, phone_column: str = "phone") -> list[PhoneVerification]: 

24 """ 

25 Process CSV text content. 

26 Returns list of verification results. 

27 """ 

28 results = [] 

29 

30 try: 

31 # Strip UTF-8 BOM if present (Excel on Windows adds this) 

32 csv_text = csv_text.lstrip("\ufeff") 

33 

34 # Use StringIO to parse CSV text 

35 csv_file = StringIO(csv_text) 

36 reader = csv.DictReader(csv_file) 

37 

38 # Find phone column (case-insensitive) 

39 headers = reader.fieldnames or [] 

40 phone_col = self._find_phone_column(headers, phone_column) 

41 

42 if not phone_col: 

43 raise ValueError(f"Phone column '{phone_column}' not found in CSV") 

44 

45 logger.info(f"Starting CSV processing using phone column '{phone_col}'") 

46 

47 for row_num, row in enumerate(reader, start=2): # Start at 2 (header is 1) 

48 try: 

49 phone = row.get(phone_col, "").strip() 

50 if not phone: 

51 logger.warning(f"Empty phone at row {row_num}") 

52 continue 

53 

54 # Verify phone 

55 result = self.verifier.verify(phone) 

56 results.append(result) 

57 

58 # Log progress every 100 rows 

59 if len(results) % 100 == 0: 

60 logger.info(f"Processed {len(results)} phones (at row {row_num})") 

61 

62 except ValueError as e: 

63 logger.warning(f"Invalid phone at row {row_num}: {str(e)}") 

64 continue 

65 except Exception as e: 

66 logger.error(f"Error processing row {row_num}: {str(e)}") 

67 continue 

68 

69 logger.info(f"Completed processing {len(results)} valid phones") 

70 

71 except Exception as e: 

72 logger.error(f"CSV processing failed: {str(e)}") 

73 raise 

74 

75 return results 

76 

77 def _find_phone_column(self, headers: list[str] | Sequence[str], preferred: str) -> str | None: 

78 """Find phone column in headers (case-insensitive)""" 

79 # First try exact match 

80 for header in headers: 

81 if header.lower() == preferred.lower(): 

82 return header 

83 

84 # Common phone column names 

85 phone_patterns = [ 

86 "phone", 

87 "phone_number", 

88 "phonenumber", 

89 "mobile", 

90 "cell", 

91 "telephone", 

92 "tel", 

93 "number", 

94 "contact", 

95 ] 

96 

97 for header in headers: 

98 header_lower = header.lower() 

99 for pattern in phone_patterns: 

100 if pattern in header_lower: 

101 logger.info(f"Using column '{header}' as phone column") 

102 return header 

103 

104 return None 

105 

106 def generate_results_csv(self, original_csv_text: str, results: list[PhoneVerification]) -> str: 

107 """ 

108 Generate CSV with original data plus verification results. 

109 Adds columns: line_type, dnc 

110 Returns CSV text string. 

111 """ 

112 # Create lookup dict 

113 results_map = {r.phone_number: r for r in results} 

114 

115 # Parse original CSV (strip UTF-8 BOM if present) 

116 original_csv_text = original_csv_text.lstrip("\ufeff") 

117 input_file = StringIO(original_csv_text) 

118 reader = csv.DictReader(input_file) 

119 headers = list(reader.fieldnames or []) 

120 

121 # Add new columns 

122 output_headers = headers + ["line_type", "dnc"] 

123 

124 # Create output CSV in memory 

125 output = StringIO() 

126 writer = csv.DictWriter(output, fieldnames=output_headers) 

127 writer.writeheader() 

128 

129 phone_col = self._find_phone_column(headers, "phone") 

130 

131 for row in reader: 

132 phone = row.get(phone_col, "").strip() 

133 

134 # Try to normalize for lookup 

135 try: 

136 normalized = self.verifier.normalize_phone(phone) 

137 if normalized in results_map: 

138 result = results_map[normalized] 

139 row["line_type"] = result.line_type.value 

140 row["dnc"] = "true" if result.dnc else "false" 

141 else: 

142 row["line_type"] = "unknown" 

143 row["dnc"] = "" 

144 except Exception: 

145 row["line_type"] = "invalid" 

146 row["dnc"] = "" 

147 

148 writer.writerow(row) 

149 

150 # Return CSV text 

151 return output.getvalue() 

152 

153 def process_csv_stream( 

154 self, lines: Iterable[str], phone_column: str = "phone", batch_size: int = 100 

155 ) -> Iterator[list[PhoneVerification]]: 

156 """ 

157 Process CSV lines as a stream, yielding batches of results. 

158 Memory-efficient for large files. 

159 

160 Args: 

161 lines: Iterator of CSV lines (including header) 

162 phone_column: Column name containing phone numbers 

163 batch_size: Number of results to accumulate before yielding 

164 

165 Yields: 

166 Batches of PhoneVerification results 

167 """ 

168 lines_list = list(lines) # Need to iterate twice - once for headers, once for data 

169 

170 if not lines_list: 

171 logger.error("Empty CSV stream") 

172 return 

173 

174 # Parse header (strip UTF-8 BOM if present) 

175 header_line = lines_list[0].lstrip("\ufeff") 

176 reader = csv.DictReader(StringIO(header_line)) 

177 headers = reader.fieldnames or [] 

178 phone_col = self._find_phone_column(headers, phone_column) 

179 

180 if not phone_col: 

181 raise ValueError(f"Phone column '{phone_column}' not found in CSV") 

182 

183 batch = [] 

184 row_num = 2 # Start at 2 (header is 1) 

185 total_processed = 0 

186 

187 # Process data lines 

188 for line in lines_list[1:]: 

189 if not line.strip(): 

190 continue 

191 

192 try: 

193 # Parse single line 

194 row = next(csv.DictReader(StringIO(line), fieldnames=headers)) 

195 phone = row.get(phone_col, "").strip() 

196 

197 if not phone: 

198 logger.warning(f"Empty phone at row {row_num}") 

199 row_num += 1 

200 continue 

201 

202 # Verify phone 

203 result = self.verifier.verify(phone) 

204 batch.append(result) 

205 total_processed += 1 

206 

207 # Yield batch when full 

208 if len(batch) >= batch_size: 

209 logger.info( 

210 f"Processed batch of {len(batch)} phones (total: {total_processed}, at row {row_num})" 

211 ) 

212 yield batch 

213 batch = [] 

214 

215 except ValueError as e: 

216 logger.warning(f"Invalid phone at row {row_num}: {str(e)}") 

217 except Exception as e: 

218 logger.error(f"Error processing row {row_num}: {str(e)}") 

219 finally: 

220 row_num += 1 

221 

222 # Yield remaining results 

223 if batch: 

224 logger.info(f"Processed final batch of {len(batch)} phones (total: {total_processed})") 

225 yield batch 

226 

227 logger.info(f"Stream processing completed. Total processed: {total_processed}") 

228 

229 def generate_results_csv_stream( 

230 self, 

231 original_lines: Iterable[str], 

232 results_stream: Iterator[list[PhoneVerification]], 

233 phone_column: str = "phone", 

234 ) -> Iterator[str]: 

235 """ 

236 Generate CSV results as a stream, line by line. 

237 Memory-efficient for large files. 

238 

239 Args: 

240 original_lines: Iterator of original CSV lines 

241 results_stream: Iterator of batched PhoneVerification results 

242 phone_column: Column name containing phone numbers 

243 

244 Yields: 

245 CSV lines with verification results added 

246 """ 

247 lines_iter = iter(original_lines) 

248 

249 # Read and yield modified header 

250 try: 

251 header_line = next(lines_iter).lstrip("\ufeff") 

252 reader = csv.DictReader(StringIO(header_line)) 

253 headers = list(reader.fieldnames or []) 

254 

255 # Add new columns 

256 output_headers = headers + ["line_type", "dnc"] 

257 yield ",".join(output_headers) + "\n" 

258 

259 phone_col = self._find_phone_column(headers, phone_column) 

260 

261 except StopIteration: 

262 return 

263 

264 # Build results lookup from stream 

265 results_map = {} 

266 for batch in results_stream: 

267 for result in batch: 

268 results_map[result.phone_number] = result 

269 

270 # Reset lines iterator 

271 lines_iter = iter(original_lines) 

272 next(lines_iter) # Skip header 

273 

274 # Process and yield data lines 

275 for line in lines_iter: 

276 if not line.strip(): 

277 continue 

278 

279 row = next(csv.DictReader(StringIO(line), fieldnames=headers)) 

280 phone = row.get(phone_col, "").strip() 

281 

282 # Add verification results 

283 try: 

284 normalized = self.verifier.normalize_phone(phone) 

285 if normalized in results_map: 

286 result = results_map[normalized] 

287 row["line_type"] = result.line_type.value 

288 row["dnc"] = "true" if result.dnc else "false" 

289 else: 

290 row["line_type"] = "unknown" 

291 row["dnc"] = "" 

292 except Exception: 

293 row["line_type"] = "invalid" 

294 row["dnc"] = "" 

295 

296 # Write row 

297 output = StringIO() 

298 writer = csv.DictWriter(output, fieldnames=output_headers) 

299 writer.writerow(row) 

300 yield output.getvalue()