Coverage for src / ai_lls_lib / auth / context_parser.py: 100%
46 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-06 23:45 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-06 23:45 +0000
1"""Auth context parser for HTTP API v2.0 events."""
3import functools
4import json
5from collections.abc import Callable
6from typing import Any
9def get_user_from_event(event: dict[str, Any]) -> str | None:
10 """
11 Extract user ID from HTTP API v2.0 event with all possible paths.
12 Handles both JWT and API key authentication contexts.
14 This function handles the complexities of AWS API Gateway authorizer contexts,
15 especially when EnableSimpleResponses is set to false, which wraps the
16 context in a 'lambda' key.
18 Args:
19 event: The Lambda event from API Gateway HTTP API v2.0
21 Returns:
22 User ID string if found, None otherwise
23 """
24 request_context = event.get("requestContext", {})
25 auth = request_context.get("authorizer", {})
27 # Handle lambda-wrapped context (EnableSimpleResponses: false)
28 # When EnableSimpleResponses is false, the authorizer context is wrapped
29 lam_ctx = auth.get("lambda", auth) if isinstance(auth.get("lambda"), dict) else auth
31 # Try all possible paths for user_id in priority order
32 user_id = (
33 # Lambda authorizer paths (most common with current setup)
34 lam_ctx.get("principal_id")
35 or lam_ctx.get("principalId")
36 or lam_ctx.get("sub")
37 or lam_ctx.get("user_id")
38 or
39 # JWT paths (when using JWT authorizer directly)
40 auth.get("jwt", {}).get("claims", {}).get("sub")
41 or
42 # Direct auth paths (fallback)
43 auth.get("principal_id")
44 or auth.get("principalId")
45 or auth.get("sub")
46 )
48 return str(user_id) if user_id else None
51def get_email_from_event(event: dict[str, Any]) -> str | None:
52 """
53 Extract email from HTTP API v2.0 event.
55 Args:
56 event: The Lambda event from API Gateway HTTP API v2.0
58 Returns:
59 Email string if found, None otherwise
60 """
61 request_context = event.get("requestContext", {})
62 auth = request_context.get("authorizer", {})
64 # Handle lambda-wrapped context
65 lam_ctx = auth.get("lambda", auth) if isinstance(auth.get("lambda"), dict) else auth
67 # Try to get email from various locations
68 email = (
69 lam_ctx.get("email")
70 or auth.get("jwt", {}).get("claims", {}).get("email")
71 or auth.get("email")
72 )
74 return str(email) if email else None
77def is_admin(event: dict[str, Any]) -> bool:
78 """
79 Check if the authenticated user has admin privileges.
81 The is_admin flag is set by the authorizer based on Cognito group membership.
82 API key authentication never has admin access.
84 Args:
85 event: The Lambda event from API Gateway HTTP API v2.0
87 Returns:
88 True if user is an admin, False otherwise
89 """
90 request_context = event.get("requestContext", {})
91 auth = request_context.get("authorizer", {})
93 # Handle lambda-wrapped context
94 lam_ctx = auth.get("lambda", auth) if isinstance(auth.get("lambda"), dict) else auth
96 # Check is_admin from authorizer context (set as string 'true' or 'false')
97 is_admin_str = lam_ctx.get("is_admin", "false")
99 # Handle both string and boolean values
100 if isinstance(is_admin_str, bool):
101 return is_admin_str
102 return str(is_admin_str).lower() == "true"
105def get_groups_from_event(event: dict[str, Any]) -> list[str]:
106 """
107 Extract user groups from HTTP API v2.0 event.
109 Groups are passed as a comma-separated string in the authorizer context.
111 Args:
112 event: The Lambda event from API Gateway HTTP API v2.0
114 Returns:
115 List of group names, empty list if none
116 """
117 request_context = event.get("requestContext", {})
118 auth = request_context.get("authorizer", {})
120 # Handle lambda-wrapped context
121 lam_ctx = auth.get("lambda", auth) if isinstance(auth.get("lambda"), dict) else auth
123 # Groups are stored as comma-separated string
124 groups_str = lam_ctx.get("groups", "")
126 if not groups_str:
127 return []
129 return [g.strip() for g in groups_str.split(",") if g.strip()]
132def require_admin(func: Callable) -> Callable:
133 """
134 Decorator that requires admin access for a handler function.
136 Returns 403 Forbidden if the user is not an admin.
138 Usage:
139 @require_admin
140 def handler(event, context):
141 # Only runs if user is admin
142 ...
143 """
145 @functools.wraps(func)
146 def wrapper(event: dict[str, Any], context: Any) -> dict[str, Any]:
147 if not is_admin(event):
148 return {
149 "statusCode": 403,
150 "headers": {"Content-Type": "application/json"},
151 "body": json.dumps({"error": "Admin access required"}),
152 }
153 result: dict[str, Any] = func(event, context)
154 return result
156 return wrapper
159def get_auth_type(event: dict[str, Any]) -> str | None:
160 """
161 Get the authentication type used for this request.
163 Args:
164 event: The Lambda event from API Gateway HTTP API v2.0
166 Returns:
167 'jwt' for Cognito token auth, 'api_key' for API key auth, None if unknown
168 """
169 request_context = event.get("requestContext", {})
170 auth = request_context.get("authorizer", {})
172 # Handle lambda-wrapped context
173 lam_ctx = auth.get("lambda", auth) if isinstance(auth.get("lambda"), dict) else auth
175 auth_type = lam_ctx.get("auth_type")
176 return str(auth_type) if auth_type else None