Coverage for src / ai_lls_lib / apikeys / legacy_key_service.py: 83%
101 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-06 23:45 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-06 23:45 +0000
1"""Legacy API key rotation and validation with PBKDF2 hashing.
3Legacy keys use different format and hashing than managed keys:
4- Format: lls_{token_urlsafe(32)} (vs lls_mk_... for managed)
5- Hashing: PBKDF2-SHA256 with salt (vs plain SHA-256 for managed)
6- Key ID: key_{token_hex(8)} (vs mk_... for managed)
7- Lookup: SHA-256 lookup_hash GSI for O(1) validation
8"""
10import hashlib
11import logging
12import os
13import secrets
14from datetime import UTC, datetime
15from typing import TYPE_CHECKING, Any
17from ai_lls_lib.key_management import compute_key_hash
19try:
20 import boto3
21 from botocore.exceptions import ClientError
23 HAS_BOTO3 = True
24except ImportError:
25 boto3 = None # type: ignore[assignment]
26 HAS_BOTO3 = False
28if TYPE_CHECKING:
29 from mypy_boto3_dynamodb.service_resource import Table
31logger = logging.getLogger(__name__)
33LEGACY_KEY_PREFIX = "lls_"
34LEGACY_KEY_ID_PREFIX = "key_"
35PBKDF2_ITERATIONS = 100_000
38def compute_legacy_key_hash(key: str, salt: bytes | None = None) -> tuple[str, str]:
39 """Compute PBKDF2-SHA256 hash of a legacy API key.
41 Args:
42 key: Plaintext API key.
43 salt: Optional salt bytes (generated if not provided).
45 Returns:
46 Tuple of (hash_hex, salt_hex).
47 """
48 if salt is None:
49 salt = secrets.token_bytes(32)
51 hash_bytes = hashlib.pbkdf2_hmac("sha256", key.encode(), salt, PBKDF2_ITERATIONS)
52 return hash_bytes.hex(), salt.hex()
55class LegacyApiKeyService:
56 """Manages legacy single-key-per-user API keys with PBKDF2 hashing.
58 DynamoDB table schema:
59 - Hash key: api_key_id (S)
60 - GSI: gsi_user on user_id (HASH)
61 """
63 table: "Table | None"
65 def __init__(self, table_name: str | None = None):
66 """Initialize with DynamoDB table."""
67 if not HAS_BOTO3 or not boto3:
68 raise RuntimeError("boto3 is required for LegacyApiKeyService")
70 self.dynamodb = boto3.resource("dynamodb")
71 self.table_name = table_name if table_name else os.environ["API_KEYS_TABLE"]
73 try:
74 self.table = self.dynamodb.Table(self.table_name)
75 except Exception as e:
76 logger.error(f"Failed to connect to DynamoDB table {self.table_name}: {e}")
77 self.table = None
79 def rotate_key(self, user_id: str, email: str | None = None) -> dict[str, Any]:
80 """Rotate (or create) a user's legacy API key.
82 Deactivates all existing active keys for the user, then creates
83 a new key with PBKDF2 hashing.
85 Args:
86 user_id: User identifier.
87 email: Optional user email stored for reference.
89 Returns:
90 Dict with api_key_id, api_key (plaintext), created_at, last4, message.
91 """
92 if not self.table:
93 raise RuntimeError(f"DynamoDB table {self.table_name} not accessible")
95 # Find and deactivate existing keys
96 self._deactivate_user_keys(user_id)
98 # Generate new key
99 api_key = f"{LEGACY_KEY_PREFIX}{secrets.token_urlsafe(32)}"
100 api_key_id = f"{LEGACY_KEY_ID_PREFIX}{secrets.token_hex(8)}"
101 key_hash, salt_hex = compute_legacy_key_hash(api_key)
102 lookup_hash = compute_key_hash(api_key)
103 now = datetime.now(UTC).isoformat()
105 item: dict[str, Any] = {
106 "api_key_id": api_key_id,
107 "user_id": user_id,
108 "key_hash": key_hash,
109 "salt": salt_hex,
110 "lookup_hash": lookup_hash,
111 "created_at": now,
112 "active": True,
113 "last_used": None,
114 "usage_count": 0,
115 }
116 if email:
117 item["email"] = email
119 self.table.put_item(Item=item)
121 logger.info(f"Rotated legacy key for user {user_id}, new key_id: {api_key_id}")
122 return {
123 "api_key_id": api_key_id,
124 "api_key": api_key,
125 "created_at": now,
126 "last4": api_key[-4:],
127 "message": "API key rotated successfully",
128 }
130 def _deactivate_user_keys(self, user_id: str) -> None:
131 """Deactivate all active keys for a user via the gsi_user GSI."""
132 if not self.table:
133 return
135 try:
136 response = self.table.query(
137 IndexName="gsi_user",
138 KeyConditionExpression="user_id = :uid",
139 ExpressionAttributeValues={":uid": user_id},
140 )
141 now = datetime.now(UTC).isoformat()
143 for item in response.get("Items", []):
144 if item.get("active"):
145 self.table.update_item(
146 Key={"api_key_id": item["api_key_id"]},
147 UpdateExpression=("SET active = :false_val, deactivated_at = :now"),
148 ExpressionAttributeValues={
149 ":false_val": False,
150 ":now": now,
151 },
152 )
153 except ClientError as e:
154 logger.error(f"Error deactivating keys for {user_id}: {e}")
155 raise
157 def validate_by_lookup_hash(self, api_key: str) -> dict[str, Any] | None:
158 """Validate a legacy API key using the lookup_hash GSI (O(1) path).
160 Computes the SHA-256 lookup hash, queries the GSI, then verifies
161 with PBKDF2 against the stored hash and salt.
163 Args:
164 api_key: Plaintext API key to validate.
166 Returns:
167 The DynamoDB item dict if valid, or None.
168 """
169 if not self.table:
170 raise RuntimeError(f"DynamoDB table {self.table_name} not accessible")
172 lookup_hash = compute_key_hash(api_key)
174 response = self.table.query(
175 IndexName="gsi_lookup_hash",
176 KeyConditionExpression="lookup_hash = :lh",
177 ExpressionAttributeValues={":lh": lookup_hash},
178 )
180 for item in response.get("Items", []):
181 if not item.get("active"):
182 continue
183 stored_hash = str(item["key_hash"])
184 salt_bytes = bytes.fromhex(str(item["salt"]))
185 computed_hash, _ = compute_legacy_key_hash(api_key, salt_bytes)
186 if computed_hash == stored_hash:
187 return dict(item)
189 return None
191 def backfill_lookup_hash(self, api_key_id: str, lookup_hash: str) -> None:
192 """Write lookup_hash to an existing item for GSI indexing.
194 Args:
195 api_key_id: The key's primary key.
196 lookup_hash: SHA-256 hex digest of the plaintext key.
197 """
198 if not self.table:
199 raise RuntimeError(f"DynamoDB table {self.table_name} not accessible")
201 self.table.update_item(
202 Key={"api_key_id": api_key_id},
203 UpdateExpression="SET lookup_hash = :lh",
204 ExpressionAttributeValues={":lh": lookup_hash},
205 )
206 logger.info(f"Backfilled lookup_hash for {api_key_id}")
208 def validate_key(self, api_key: str) -> dict[str, Any] | None:
209 """Validate a legacy API key using GSI-first with scan fallback.
211 Tries the O(1) lookup_hash GSI path first. If no match (key
212 predates the GSI), falls back to a full table scan with PBKDF2
213 verification and lazily backfills the lookup_hash.
215 Args:
216 api_key: Plaintext API key to validate.
218 Returns:
219 The DynamoDB item dict if valid, or None.
220 """
221 if not self.table:
222 raise RuntimeError(f"DynamoDB table {self.table_name} not accessible")
224 # O(1) GSI path
225 result = self.validate_by_lookup_hash(api_key)
226 if result:
227 return result
229 # O(n) scan fallback for items without lookup_hash
230 lookup_hash = compute_key_hash(api_key)
231 response = self.table.scan()
233 for item in response.get("Items", []):
234 if not item.get("active"):
235 continue
236 salt_hex = item.get("salt")
237 if not salt_hex:
238 continue
239 salt_bytes = bytes.fromhex(str(salt_hex))
240 computed_hash, _ = compute_legacy_key_hash(api_key, salt_bytes)
241 if computed_hash == str(item["key_hash"]):
242 # Lazy backfill
243 self.backfill_lookup_hash(str(item["api_key_id"]), lookup_hash)
244 return dict(item)
246 return None