Coverage for src / ai_lls_lib / files / file_service.py: 89%
82 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-06 23:45 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-06 23:45 +0000
1"""File listing, filtering, and pagination with DynamoDB."""
3import logging
4import math
5import os
6from typing import TYPE_CHECKING, Any
8try:
9 import boto3
11 HAS_BOTO3 = True
12except ImportError:
13 boto3 = None # type: ignore[assignment]
14 HAS_BOTO3 = False
16if TYPE_CHECKING:
17 from mypy_boto3_dynamodb.service_resource import Table
19logger = logging.getLogger(__name__)
21ALLOWED_SORT_COLUMNS = {"filename", "created_at", "status", "total_rows", "credits_charged"}
22ALLOWED_STATUSES = {"completed", "processing", "failed", "pending"}
24SORT_KEY_MAP: dict[str, Any] = {
25 "filename": lambda item: (item.get("file_name") or item.get("filename") or "").lower(),
26 "created_at": lambda item: item.get("created_at", ""),
27 "status": lambda item: item.get("status", ""),
28 "total_rows": lambda item: int(item.get("total_rows", 0) or item.get("row_count", 0) or 0),
29 "credits_charged": lambda item: int(item.get("credits_charged", 0) or 0),
30}
33def _get_filename(item: dict[str, Any]) -> str:
34 """Get filename from item, handling field name inconsistencies."""
35 return item.get("file_name") or item.get("filename") or ""
38def _map_file_fields(item: dict[str, Any]) -> dict[str, Any]:
39 """Map DynamoDB item fields to frontend field names."""
40 filename = _get_filename(item)
41 status = item.get("status", "")
42 total_records = int(item.get("total_rows", 0) or item.get("row_count", 0) or 0)
44 file_url = item.get("result_url") or item.get("result_s3_url") or ""
45 if status == "failed":
46 file_url = "Failed"
48 mapped = {
49 "file_reference": item.get("file_id", ""),
50 "file_name": filename,
51 "last_updated": item.get("created_at", ""),
52 "total_records": total_records,
53 "file_url": file_url,
54 "status": status,
55 "processed_rows": int(item.get("processed_rows", 0) or 0),
56 "credits_charged": int(item.get("credits_charged", 0) or 0),
57 "credits_reversed": int(item.get("credits_reversed", 0) or 0),
58 "file_method": item.get("file_method", ""),
59 }
61 if status == "completed":
62 mapped["completed_at"] = item.get("completed_at", "")
63 mapped["result_available"] = bool(file_url and file_url != "Failed")
65 if status == "failed":
66 mapped["error"] = item.get("error", "")
67 mapped["failed_at"] = item.get("failed_at", "")
69 return mapped
72def _map_admin_fields(item: dict[str, Any]) -> dict[str, Any]:
73 """Map DynamoDB item fields for admin view."""
74 return {
75 "file_id": item.get("file_id", ""),
76 "filename": _get_filename(item),
77 "created_at": item.get("created_at") or item.get("uploaded_at", ""),
78 "row_count": int(item.get("total_rows", 0) or item.get("row_count", 0) or 0),
79 "status": item.get("status", ""),
80 "input_key": item.get("input_key") or item.get("s3_key"),
81 "output_key": item.get("output_key") or item.get("result_key"),
82 "input_url": None,
83 "output_url": None,
84 }
87class FileService:
88 """Manages file metadata listing with filtering and pagination.
90 DynamoDB table schema:
91 - Hash key: file_id (S)
92 - GSI: user-index on user_id (HASH) / created_at (RANGE)
93 """
95 table: "Table | None"
97 def __init__(self, table_name: str | None = None):
98 """Initialize with DynamoDB table."""
99 if not HAS_BOTO3 or not boto3:
100 raise RuntimeError("boto3 is required for FileService")
102 self.dynamodb = boto3.resource("dynamodb")
103 self.table_name = table_name if table_name else os.environ["FILES_TABLE"]
105 try:
106 self.table = self.dynamodb.Table(self.table_name)
107 except Exception as e:
108 logger.error(f"Failed to connect to DynamoDB table {self.table_name}: {e}")
109 self.table = None
111 def _query_all_user_files(self, user_id: str) -> list[dict[str, Any]]:
112 """Query all files for a user via the user-index GSI."""
113 if not self.table:
114 raise RuntimeError(f"DynamoDB table {self.table_name} not accessible")
116 items: list[dict[str, Any]] = []
117 kwargs: dict[str, Any] = {
118 "IndexName": "user-index",
119 "KeyConditionExpression": "user_id = :uid",
120 "ExpressionAttributeValues": {":uid": user_id},
121 }
123 while True:
124 response = self.table.query(**kwargs)
125 items.extend(response.get("Items", []))
126 last_key = response.get("LastEvaluatedKey")
127 if not last_key:
128 break
129 kwargs["ExclusiveStartKey"] = last_key
131 return items
133 def list_user_files(
134 self,
135 user_id: str,
136 page: int = 1,
137 limit: int = 20,
138 search: str = "",
139 sort_by: str = "created_at",
140 sort_order: str = "desc",
141 status_filter: str = "",
142 ) -> dict[str, Any]:
143 """List files for a user with filtering, sorting, and pagination.
145 Args:
146 user_id: User identifier.
147 page: Page number (1-indexed).
148 limit: Items per page.
149 search: Case-insensitive substring match on filename.
150 sort_by: Column to sort by (from ALLOWED_SORT_COLUMNS).
151 sort_order: 'asc' or 'desc'.
152 status_filter: Filter by file status.
154 Returns:
155 Dict with files list and pagination metadata.
156 """
157 items = self._query_all_user_files(user_id)
159 # Apply status filter
160 if status_filter and status_filter in ALLOWED_STATUSES:
161 items = [i for i in items if i.get("status") == status_filter]
163 # Apply search filter
164 if search:
165 search_lower = search.lower()
166 items = [i for i in items if search_lower in _get_filename(i).lower()]
168 # Sort
169 sort_key = SORT_KEY_MAP.get(sort_by, SORT_KEY_MAP["created_at"])
170 reverse = sort_order != "asc"
171 items.sort(key=sort_key, reverse=reverse)
173 # Pagination
174 total_count = len(items)
175 total_pages = max(1, math.ceil(total_count / limit))
176 start = (page - 1) * limit
177 end = start + limit
178 page_items = items[start:end]
180 return {
181 "files": [_map_file_fields(item) for item in page_items],
182 "page": page,
183 "per_page": limit,
184 "has_more": end < total_count,
185 "total_pages": total_pages,
186 "total_files_count": total_count,
187 }
189 def list_user_files_admin(
190 self,
191 user_id: str,
192 page: int = 1,
193 limit: int = 20,
194 ) -> dict[str, Any]:
195 """Admin view of user files with raw S3 keys.
197 Returns raw S3 keys for the handler to generate presigned URLs.
198 """
199 items = self._query_all_user_files(user_id)
201 # Sort newest first
202 items.sort(key=lambda i: i.get("created_at", ""), reverse=True)
204 total = len(items)
205 start = (page - 1) * limit
206 end = start + limit
207 page_items = items[start:end]
209 return {
210 "files": [_map_admin_fields(item) for item in page_items],
211 "total": total,
212 "page": page,
213 "limit": limit,
214 }