Coverage for src / ai_lls_lib / files / file_service.py: 89%

82 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-06 23:45 +0000

1"""File listing, filtering, and pagination with DynamoDB.""" 

2 

3import logging 

4import math 

5import os 

6from typing import TYPE_CHECKING, Any 

7 

8try: 

9 import boto3 

10 

11 HAS_BOTO3 = True 

12except ImportError: 

13 boto3 = None # type: ignore[assignment] 

14 HAS_BOTO3 = False 

15 

16if TYPE_CHECKING: 

17 from mypy_boto3_dynamodb.service_resource import Table 

18 

19logger = logging.getLogger(__name__) 

20 

21ALLOWED_SORT_COLUMNS = {"filename", "created_at", "status", "total_rows", "credits_charged"} 

22ALLOWED_STATUSES = {"completed", "processing", "failed", "pending"} 

23 

24SORT_KEY_MAP: dict[str, Any] = { 

25 "filename": lambda item: (item.get("file_name") or item.get("filename") or "").lower(), 

26 "created_at": lambda item: item.get("created_at", ""), 

27 "status": lambda item: item.get("status", ""), 

28 "total_rows": lambda item: int(item.get("total_rows", 0) or item.get("row_count", 0) or 0), 

29 "credits_charged": lambda item: int(item.get("credits_charged", 0) or 0), 

30} 

31 

32 

33def _get_filename(item: dict[str, Any]) -> str: 

34 """Get filename from item, handling field name inconsistencies.""" 

35 return item.get("file_name") or item.get("filename") or "" 

36 

37 

38def _map_file_fields(item: dict[str, Any]) -> dict[str, Any]: 

39 """Map DynamoDB item fields to frontend field names.""" 

40 filename = _get_filename(item) 

41 status = item.get("status", "") 

42 total_records = int(item.get("total_rows", 0) or item.get("row_count", 0) or 0) 

43 

44 file_url = item.get("result_url") or item.get("result_s3_url") or "" 

45 if status == "failed": 

46 file_url = "Failed" 

47 

48 mapped = { 

49 "file_reference": item.get("file_id", ""), 

50 "file_name": filename, 

51 "last_updated": item.get("created_at", ""), 

52 "total_records": total_records, 

53 "file_url": file_url, 

54 "status": status, 

55 "processed_rows": int(item.get("processed_rows", 0) or 0), 

56 "credits_charged": int(item.get("credits_charged", 0) or 0), 

57 "credits_reversed": int(item.get("credits_reversed", 0) or 0), 

58 "file_method": item.get("file_method", ""), 

59 } 

60 

61 if status == "completed": 

62 mapped["completed_at"] = item.get("completed_at", "") 

63 mapped["result_available"] = bool(file_url and file_url != "Failed") 

64 

65 if status == "failed": 

66 mapped["error"] = item.get("error", "") 

67 mapped["failed_at"] = item.get("failed_at", "") 

68 

69 return mapped 

70 

71 

72def _map_admin_fields(item: dict[str, Any]) -> dict[str, Any]: 

73 """Map DynamoDB item fields for admin view.""" 

74 return { 

75 "file_id": item.get("file_id", ""), 

76 "filename": _get_filename(item), 

77 "created_at": item.get("created_at") or item.get("uploaded_at", ""), 

78 "row_count": int(item.get("total_rows", 0) or item.get("row_count", 0) or 0), 

79 "status": item.get("status", ""), 

80 "input_key": item.get("input_key") or item.get("s3_key"), 

81 "output_key": item.get("output_key") or item.get("result_key"), 

82 "input_url": None, 

83 "output_url": None, 

84 } 

85 

86 

87class FileService: 

88 """Manages file metadata listing with filtering and pagination. 

89 

90 DynamoDB table schema: 

91 - Hash key: file_id (S) 

92 - GSI: user-index on user_id (HASH) / created_at (RANGE) 

93 """ 

94 

95 table: "Table | None" 

96 

97 def __init__(self, table_name: str | None = None): 

98 """Initialize with DynamoDB table.""" 

99 if not HAS_BOTO3 or not boto3: 

100 raise RuntimeError("boto3 is required for FileService") 

101 

102 self.dynamodb = boto3.resource("dynamodb") 

103 self.table_name = table_name if table_name else os.environ["FILES_TABLE"] 

104 

105 try: 

106 self.table = self.dynamodb.Table(self.table_name) 

107 except Exception as e: 

108 logger.error(f"Failed to connect to DynamoDB table {self.table_name}: {e}") 

109 self.table = None 

110 

111 def _query_all_user_files(self, user_id: str) -> list[dict[str, Any]]: 

112 """Query all files for a user via the user-index GSI.""" 

113 if not self.table: 

114 raise RuntimeError(f"DynamoDB table {self.table_name} not accessible") 

115 

116 items: list[dict[str, Any]] = [] 

117 kwargs: dict[str, Any] = { 

118 "IndexName": "user-index", 

119 "KeyConditionExpression": "user_id = :uid", 

120 "ExpressionAttributeValues": {":uid": user_id}, 

121 } 

122 

123 while True: 

124 response = self.table.query(**kwargs) 

125 items.extend(response.get("Items", [])) 

126 last_key = response.get("LastEvaluatedKey") 

127 if not last_key: 

128 break 

129 kwargs["ExclusiveStartKey"] = last_key 

130 

131 return items 

132 

133 def list_user_files( 

134 self, 

135 user_id: str, 

136 page: int = 1, 

137 limit: int = 20, 

138 search: str = "", 

139 sort_by: str = "created_at", 

140 sort_order: str = "desc", 

141 status_filter: str = "", 

142 ) -> dict[str, Any]: 

143 """List files for a user with filtering, sorting, and pagination. 

144 

145 Args: 

146 user_id: User identifier. 

147 page: Page number (1-indexed). 

148 limit: Items per page. 

149 search: Case-insensitive substring match on filename. 

150 sort_by: Column to sort by (from ALLOWED_SORT_COLUMNS). 

151 sort_order: 'asc' or 'desc'. 

152 status_filter: Filter by file status. 

153 

154 Returns: 

155 Dict with files list and pagination metadata. 

156 """ 

157 items = self._query_all_user_files(user_id) 

158 

159 # Apply status filter 

160 if status_filter and status_filter in ALLOWED_STATUSES: 

161 items = [i for i in items if i.get("status") == status_filter] 

162 

163 # Apply search filter 

164 if search: 

165 search_lower = search.lower() 

166 items = [i for i in items if search_lower in _get_filename(i).lower()] 

167 

168 # Sort 

169 sort_key = SORT_KEY_MAP.get(sort_by, SORT_KEY_MAP["created_at"]) 

170 reverse = sort_order != "asc" 

171 items.sort(key=sort_key, reverse=reverse) 

172 

173 # Pagination 

174 total_count = len(items) 

175 total_pages = max(1, math.ceil(total_count / limit)) 

176 start = (page - 1) * limit 

177 end = start + limit 

178 page_items = items[start:end] 

179 

180 return { 

181 "files": [_map_file_fields(item) for item in page_items], 

182 "page": page, 

183 "per_page": limit, 

184 "has_more": end < total_count, 

185 "total_pages": total_pages, 

186 "total_files_count": total_count, 

187 } 

188 

189 def list_user_files_admin( 

190 self, 

191 user_id: str, 

192 page: int = 1, 

193 limit: int = 20, 

194 ) -> dict[str, Any]: 

195 """Admin view of user files with raw S3 keys. 

196 

197 Returns raw S3 keys for the handler to generate presigned URLs. 

198 """ 

199 items = self._query_all_user_files(user_id) 

200 

201 # Sort newest first 

202 items.sort(key=lambda i: i.get("created_at", ""), reverse=True) 

203 

204 total = len(items) 

205 start = (page - 1) * limit 

206 end = start + limit 

207 page_items = items[start:end] 

208 

209 return { 

210 "files": [_map_admin_fields(item) for item in page_items], 

211 "total": total, 

212 "page": page, 

213 "limit": limit, 

214 }