Coverage for src / ai_lls_lib / auth / context_parser.py: 100%

46 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-06 23:45 +0000

1"""Auth context parser for HTTP API v2.0 events.""" 

2 

3import functools 

4import json 

5from collections.abc import Callable 

6from typing import Any 

7 

8 

9def get_user_from_event(event: dict[str, Any]) -> str | None: 

10 """ 

11 Extract user ID from HTTP API v2.0 event with all possible paths. 

12 Handles both JWT and API key authentication contexts. 

13 

14 This function handles the complexities of AWS API Gateway authorizer contexts, 

15 especially when EnableSimpleResponses is set to false, which wraps the 

16 context in a 'lambda' key. 

17 

18 Args: 

19 event: The Lambda event from API Gateway HTTP API v2.0 

20 

21 Returns: 

22 User ID string if found, None otherwise 

23 """ 

24 request_context = event.get("requestContext", {}) 

25 auth = request_context.get("authorizer", {}) 

26 

27 # Handle lambda-wrapped context (EnableSimpleResponses: false) 

28 # When EnableSimpleResponses is false, the authorizer context is wrapped 

29 lam_ctx = auth.get("lambda", auth) if isinstance(auth.get("lambda"), dict) else auth 

30 

31 # Try all possible paths for user_id in priority order 

32 user_id = ( 

33 # Lambda authorizer paths (most common with current setup) 

34 lam_ctx.get("principal_id") 

35 or lam_ctx.get("principalId") 

36 or lam_ctx.get("sub") 

37 or lam_ctx.get("user_id") 

38 or 

39 # JWT paths (when using JWT authorizer directly) 

40 auth.get("jwt", {}).get("claims", {}).get("sub") 

41 or 

42 # Direct auth paths (fallback) 

43 auth.get("principal_id") 

44 or auth.get("principalId") 

45 or auth.get("sub") 

46 ) 

47 

48 return str(user_id) if user_id else None 

49 

50 

51def get_email_from_event(event: dict[str, Any]) -> str | None: 

52 """ 

53 Extract email from HTTP API v2.0 event. 

54 

55 Args: 

56 event: The Lambda event from API Gateway HTTP API v2.0 

57 

58 Returns: 

59 Email string if found, None otherwise 

60 """ 

61 request_context = event.get("requestContext", {}) 

62 auth = request_context.get("authorizer", {}) 

63 

64 # Handle lambda-wrapped context 

65 lam_ctx = auth.get("lambda", auth) if isinstance(auth.get("lambda"), dict) else auth 

66 

67 # Try to get email from various locations 

68 email = ( 

69 lam_ctx.get("email") 

70 or auth.get("jwt", {}).get("claims", {}).get("email") 

71 or auth.get("email") 

72 ) 

73 

74 return str(email) if email else None 

75 

76 

77def is_admin(event: dict[str, Any]) -> bool: 

78 """ 

79 Check if the authenticated user has admin privileges. 

80 

81 The is_admin flag is set by the authorizer based on Cognito group membership. 

82 API key authentication never has admin access. 

83 

84 Args: 

85 event: The Lambda event from API Gateway HTTP API v2.0 

86 

87 Returns: 

88 True if user is an admin, False otherwise 

89 """ 

90 request_context = event.get("requestContext", {}) 

91 auth = request_context.get("authorizer", {}) 

92 

93 # Handle lambda-wrapped context 

94 lam_ctx = auth.get("lambda", auth) if isinstance(auth.get("lambda"), dict) else auth 

95 

96 # Check is_admin from authorizer context (set as string 'true' or 'false') 

97 is_admin_str = lam_ctx.get("is_admin", "false") 

98 

99 # Handle both string and boolean values 

100 if isinstance(is_admin_str, bool): 

101 return is_admin_str 

102 return str(is_admin_str).lower() == "true" 

103 

104 

105def get_groups_from_event(event: dict[str, Any]) -> list[str]: 

106 """ 

107 Extract user groups from HTTP API v2.0 event. 

108 

109 Groups are passed as a comma-separated string in the authorizer context. 

110 

111 Args: 

112 event: The Lambda event from API Gateway HTTP API v2.0 

113 

114 Returns: 

115 List of group names, empty list if none 

116 """ 

117 request_context = event.get("requestContext", {}) 

118 auth = request_context.get("authorizer", {}) 

119 

120 # Handle lambda-wrapped context 

121 lam_ctx = auth.get("lambda", auth) if isinstance(auth.get("lambda"), dict) else auth 

122 

123 # Groups are stored as comma-separated string 

124 groups_str = lam_ctx.get("groups", "") 

125 

126 if not groups_str: 

127 return [] 

128 

129 return [g.strip() for g in groups_str.split(",") if g.strip()] 

130 

131 

132def require_admin(func: Callable) -> Callable: 

133 """ 

134 Decorator that requires admin access for a handler function. 

135 

136 Returns 403 Forbidden if the user is not an admin. 

137 

138 Usage: 

139 @require_admin 

140 def handler(event, context): 

141 # Only runs if user is admin 

142 ... 

143 """ 

144 

145 @functools.wraps(func) 

146 def wrapper(event: dict[str, Any], context: Any) -> dict[str, Any]: 

147 if not is_admin(event): 

148 return { 

149 "statusCode": 403, 

150 "headers": {"Content-Type": "application/json"}, 

151 "body": json.dumps({"error": "Admin access required"}), 

152 } 

153 result: dict[str, Any] = func(event, context) 

154 return result 

155 

156 return wrapper 

157 

158 

159def get_auth_type(event: dict[str, Any]) -> str | None: 

160 """ 

161 Get the authentication type used for this request. 

162 

163 Args: 

164 event: The Lambda event from API Gateway HTTP API v2.0 

165 

166 Returns: 

167 'jwt' for Cognito token auth, 'api_key' for API key auth, None if unknown 

168 """ 

169 request_context = event.get("requestContext", {}) 

170 auth = request_context.get("authorizer", {}) 

171 

172 # Handle lambda-wrapped context 

173 lam_ctx = auth.get("lambda", auth) if isinstance(auth.get("lambda"), dict) else auth 

174 

175 auth_type = lam_ctx.get("auth_type") 

176 return str(auth_type) if auth_type else None