Coverage for src / ai_lls_lib / apikeys / legacy_key_service.py: 83%

101 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-06 23:45 +0000

1"""Legacy API key rotation and validation with PBKDF2 hashing. 

2 

3Legacy keys use different format and hashing than managed keys: 

4- Format: lls_{token_urlsafe(32)} (vs lls_mk_... for managed) 

5- Hashing: PBKDF2-SHA256 with salt (vs plain SHA-256 for managed) 

6- Key ID: key_{token_hex(8)} (vs mk_... for managed) 

7- Lookup: SHA-256 lookup_hash GSI for O(1) validation 

8""" 

9 

10import hashlib 

11import logging 

12import os 

13import secrets 

14from datetime import UTC, datetime 

15from typing import TYPE_CHECKING, Any 

16 

17from ai_lls_lib.key_management import compute_key_hash 

18 

19try: 

20 import boto3 

21 from botocore.exceptions import ClientError 

22 

23 HAS_BOTO3 = True 

24except ImportError: 

25 boto3 = None # type: ignore[assignment] 

26 HAS_BOTO3 = False 

27 

28if TYPE_CHECKING: 

29 from mypy_boto3_dynamodb.service_resource import Table 

30 

31logger = logging.getLogger(__name__) 

32 

33LEGACY_KEY_PREFIX = "lls_" 

34LEGACY_KEY_ID_PREFIX = "key_" 

35PBKDF2_ITERATIONS = 100_000 

36 

37 

38def compute_legacy_key_hash(key: str, salt: bytes | None = None) -> tuple[str, str]: 

39 """Compute PBKDF2-SHA256 hash of a legacy API key. 

40 

41 Args: 

42 key: Plaintext API key. 

43 salt: Optional salt bytes (generated if not provided). 

44 

45 Returns: 

46 Tuple of (hash_hex, salt_hex). 

47 """ 

48 if salt is None: 

49 salt = secrets.token_bytes(32) 

50 

51 hash_bytes = hashlib.pbkdf2_hmac("sha256", key.encode(), salt, PBKDF2_ITERATIONS) 

52 return hash_bytes.hex(), salt.hex() 

53 

54 

55class LegacyApiKeyService: 

56 """Manages legacy single-key-per-user API keys with PBKDF2 hashing. 

57 

58 DynamoDB table schema: 

59 - Hash key: api_key_id (S) 

60 - GSI: gsi_user on user_id (HASH) 

61 """ 

62 

63 table: "Table | None" 

64 

65 def __init__(self, table_name: str | None = None): 

66 """Initialize with DynamoDB table.""" 

67 if not HAS_BOTO3 or not boto3: 

68 raise RuntimeError("boto3 is required for LegacyApiKeyService") 

69 

70 self.dynamodb = boto3.resource("dynamodb") 

71 self.table_name = table_name if table_name else os.environ["API_KEYS_TABLE"] 

72 

73 try: 

74 self.table = self.dynamodb.Table(self.table_name) 

75 except Exception as e: 

76 logger.error(f"Failed to connect to DynamoDB table {self.table_name}: {e}") 

77 self.table = None 

78 

79 def rotate_key(self, user_id: str, email: str | None = None) -> dict[str, Any]: 

80 """Rotate (or create) a user's legacy API key. 

81 

82 Deactivates all existing active keys for the user, then creates 

83 a new key with PBKDF2 hashing. 

84 

85 Args: 

86 user_id: User identifier. 

87 email: Optional user email stored for reference. 

88 

89 Returns: 

90 Dict with api_key_id, api_key (plaintext), created_at, last4, message. 

91 """ 

92 if not self.table: 

93 raise RuntimeError(f"DynamoDB table {self.table_name} not accessible") 

94 

95 # Find and deactivate existing keys 

96 self._deactivate_user_keys(user_id) 

97 

98 # Generate new key 

99 api_key = f"{LEGACY_KEY_PREFIX}{secrets.token_urlsafe(32)}" 

100 api_key_id = f"{LEGACY_KEY_ID_PREFIX}{secrets.token_hex(8)}" 

101 key_hash, salt_hex = compute_legacy_key_hash(api_key) 

102 lookup_hash = compute_key_hash(api_key) 

103 now = datetime.now(UTC).isoformat() 

104 

105 item: dict[str, Any] = { 

106 "api_key_id": api_key_id, 

107 "user_id": user_id, 

108 "key_hash": key_hash, 

109 "salt": salt_hex, 

110 "lookup_hash": lookup_hash, 

111 "created_at": now, 

112 "active": True, 

113 "last_used": None, 

114 "usage_count": 0, 

115 } 

116 if email: 

117 item["email"] = email 

118 

119 self.table.put_item(Item=item) 

120 

121 logger.info(f"Rotated legacy key for user {user_id}, new key_id: {api_key_id}") 

122 return { 

123 "api_key_id": api_key_id, 

124 "api_key": api_key, 

125 "created_at": now, 

126 "last4": api_key[-4:], 

127 "message": "API key rotated successfully", 

128 } 

129 

130 def _deactivate_user_keys(self, user_id: str) -> None: 

131 """Deactivate all active keys for a user via the gsi_user GSI.""" 

132 if not self.table: 

133 return 

134 

135 try: 

136 response = self.table.query( 

137 IndexName="gsi_user", 

138 KeyConditionExpression="user_id = :uid", 

139 ExpressionAttributeValues={":uid": user_id}, 

140 ) 

141 now = datetime.now(UTC).isoformat() 

142 

143 for item in response.get("Items", []): 

144 if item.get("active"): 

145 self.table.update_item( 

146 Key={"api_key_id": item["api_key_id"]}, 

147 UpdateExpression=("SET active = :false_val, deactivated_at = :now"), 

148 ExpressionAttributeValues={ 

149 ":false_val": False, 

150 ":now": now, 

151 }, 

152 ) 

153 except ClientError as e: 

154 logger.error(f"Error deactivating keys for {user_id}: {e}") 

155 raise 

156 

157 def validate_by_lookup_hash(self, api_key: str) -> dict[str, Any] | None: 

158 """Validate a legacy API key using the lookup_hash GSI (O(1) path). 

159 

160 Computes the SHA-256 lookup hash, queries the GSI, then verifies 

161 with PBKDF2 against the stored hash and salt. 

162 

163 Args: 

164 api_key: Plaintext API key to validate. 

165 

166 Returns: 

167 The DynamoDB item dict if valid, or None. 

168 """ 

169 if not self.table: 

170 raise RuntimeError(f"DynamoDB table {self.table_name} not accessible") 

171 

172 lookup_hash = compute_key_hash(api_key) 

173 

174 response = self.table.query( 

175 IndexName="gsi_lookup_hash", 

176 KeyConditionExpression="lookup_hash = :lh", 

177 ExpressionAttributeValues={":lh": lookup_hash}, 

178 ) 

179 

180 for item in response.get("Items", []): 

181 if not item.get("active"): 

182 continue 

183 stored_hash = str(item["key_hash"]) 

184 salt_bytes = bytes.fromhex(str(item["salt"])) 

185 computed_hash, _ = compute_legacy_key_hash(api_key, salt_bytes) 

186 if computed_hash == stored_hash: 

187 return dict(item) 

188 

189 return None 

190 

191 def backfill_lookup_hash(self, api_key_id: str, lookup_hash: str) -> None: 

192 """Write lookup_hash to an existing item for GSI indexing. 

193 

194 Args: 

195 api_key_id: The key's primary key. 

196 lookup_hash: SHA-256 hex digest of the plaintext key. 

197 """ 

198 if not self.table: 

199 raise RuntimeError(f"DynamoDB table {self.table_name} not accessible") 

200 

201 self.table.update_item( 

202 Key={"api_key_id": api_key_id}, 

203 UpdateExpression="SET lookup_hash = :lh", 

204 ExpressionAttributeValues={":lh": lookup_hash}, 

205 ) 

206 logger.info(f"Backfilled lookup_hash for {api_key_id}") 

207 

208 def validate_key(self, api_key: str) -> dict[str, Any] | None: 

209 """Validate a legacy API key using GSI-first with scan fallback. 

210 

211 Tries the O(1) lookup_hash GSI path first. If no match (key 

212 predates the GSI), falls back to a full table scan with PBKDF2 

213 verification and lazily backfills the lookup_hash. 

214 

215 Args: 

216 api_key: Plaintext API key to validate. 

217 

218 Returns: 

219 The DynamoDB item dict if valid, or None. 

220 """ 

221 if not self.table: 

222 raise RuntimeError(f"DynamoDB table {self.table_name} not accessible") 

223 

224 # O(1) GSI path 

225 result = self.validate_by_lookup_hash(api_key) 

226 if result: 

227 return result 

228 

229 # O(n) scan fallback for items without lookup_hash 

230 lookup_hash = compute_key_hash(api_key) 

231 response = self.table.scan() 

232 

233 for item in response.get("Items", []): 

234 if not item.get("active"): 

235 continue 

236 salt_hex = item.get("salt") 

237 if not salt_hex: 

238 continue 

239 salt_bytes = bytes.fromhex(str(salt_hex)) 

240 computed_hash, _ = compute_legacy_key_hash(api_key, salt_bytes) 

241 if computed_hash == str(item["key_hash"]): 

242 # Lazy backfill 

243 self.backfill_lookup_hash(str(item["api_key_id"]), lookup_hash) 

244 return dict(item) 

245 

246 return None