Coverage for src / ai_lls_lib / payment / credit_manager.py: 90%
164 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-06 23:45 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-06 23:45 +0000
1"""Credit balance management with DynamoDB."""
3import logging
4import os
5from datetime import UTC, datetime
6from decimal import Decimal
7from typing import TYPE_CHECKING, Any
9try:
10 import boto3
11 from botocore.exceptions import ClientError
13 HAS_BOTO3 = True
14except ImportError:
15 boto3 = None # type: ignore[assignment]
16 HAS_BOTO3 = False
18if TYPE_CHECKING:
19 from mypy_boto3_dynamodb.service_resource import Table
21logger = logging.getLogger(__name__)
24class InsufficientCreditsError(Exception):
25 """Raised when user has insufficient credits for an operation."""
27 def __init__(self, credits_required: int, credits_available: int):
28 self.credits_required = credits_required
29 self.credits_available = credits_available
30 super().__init__(
31 f"Insufficient credits: required {credits_required}, available {credits_available}"
32 )
35class CreditManager:
36 """
37 Manages user credit balances in DynamoDB CreditsTable.
38 """
40 table: "Table | None"
42 def __init__(self, table_name: str | None = None):
43 """Initialize with DynamoDB table."""
44 if not HAS_BOTO3 or not boto3:
45 raise RuntimeError("boto3 is required for CreditManager")
47 self.dynamodb = boto3.resource("dynamodb")
48 self.table_name = table_name if table_name else os.environ["CREDITS_TABLE"]
50 try:
51 self.table = self.dynamodb.Table(self.table_name)
52 except Exception as e:
53 logger.error(f"Failed to connect to DynamoDB table {self.table_name}: {e}")
54 self.table = None
56 def get_balance(self, user_id: str) -> int:
57 """Get current credit balance for a user."""
58 if not self.table:
59 raise RuntimeError(f"DynamoDB table {self.table_name} not accessible")
61 try:
62 response = self.table.get_item(Key={"user_id": user_id})
63 if "Item" in response:
64 credits_val = response["Item"].get("credits", 0)
65 # DynamoDB returns Decimal for numbers - convert safely
66 if credits_val is None:
67 return 0
68 return int(Decimal(str(credits_val)))
69 return 0
70 except ClientError as e:
71 logger.error(f"Error getting balance for {user_id}: {e}")
72 return 0
74 def add_credits(self, user_id: str, amount: int) -> int:
75 """Add credits to user balance and return new balance."""
76 if not self.table:
77 raise RuntimeError(f"DynamoDB table {self.table_name} not accessible")
79 try:
80 response = self.table.update_item(
81 Key={"user_id": user_id},
82 UpdateExpression="ADD credits :amount SET updated_at = :now",
83 ExpressionAttributeValues={
84 ":amount": Decimal(amount),
85 ":now": datetime.now(UTC).isoformat(),
86 },
87 ReturnValues="ALL_NEW",
88 )
89 attrs = response.get("Attributes", {})
90 credits_val = attrs.get("credits", 0)
91 # DynamoDB returns Decimal for numbers - convert safely
92 if credits_val is None:
93 return 0
94 return int(Decimal(str(credits_val)))
95 except ClientError as e:
96 logger.error(f"Error adding credits for {user_id}: {e}")
97 raise
99 def deduct_credits(self, user_id: str, amount: int) -> bool:
100 """
101 Deduct credits from user balance.
102 Returns True if successful, False if insufficient balance.
103 """
104 if not self.table:
105 raise RuntimeError(f"DynamoDB table {self.table_name} not accessible")
107 try:
108 # Conditional update - only deduct if balance >= amount
109 self.table.update_item(
110 Key={"user_id": user_id},
111 UpdateExpression="ADD credits :negative_amount SET updated_at = :now",
112 ConditionExpression="credits >= :amount",
113 ExpressionAttributeValues={
114 ":negative_amount": Decimal(-amount),
115 ":amount": Decimal(amount),
116 ":now": datetime.now(UTC).isoformat(),
117 },
118 )
119 return True
120 except ClientError as e:
121 if e.response["Error"]["Code"] == "ConditionalCheckFailedException":
122 logger.info(f"Insufficient credits for {user_id}")
123 return False
124 logger.error(f"Error deducting credits for {user_id}: {e}")
125 raise
127 def set_credits(self, user_id: str, credits: int, reason: str | None = None) -> int:
128 """Set user credit balance to an exact value and return new balance."""
129 if not self.table:
130 raise RuntimeError(f"DynamoDB table {self.table_name} not accessible")
132 try:
133 if reason:
134 logger.info(f"Setting credits for {user_id} to {credits}: {reason}")
136 response = self.table.update_item(
137 Key={"user_id": user_id},
138 UpdateExpression="SET credits = :credits, updated_at = :now",
139 ExpressionAttributeValues={
140 ":credits": Decimal(credits),
141 ":now": datetime.now(UTC).isoformat(),
142 },
143 ReturnValues="ALL_NEW",
144 )
145 attrs = response.get("Attributes", {})
146 credits_val = attrs.get("credits", 0)
147 if credits_val is None:
148 return 0
149 return int(Decimal(str(credits_val)))
150 except ClientError as e:
151 logger.error(f"Error setting credits for {user_id}: {e}")
152 raise
154 def set_subscription_state(
155 self,
156 user_id: str,
157 status: str,
158 stripe_customer_id: str | None = None,
159 stripe_subscription_id: str | None = None,
160 ) -> None:
161 """Update subscription state in CreditsTable."""
162 if not self.table:
163 raise RuntimeError(f"DynamoDB table {self.table_name} not accessible")
165 try:
166 update_expr = "SET subscription_status = :status, updated_at = :now"
167 expr_values = {":status": status, ":now": datetime.now(UTC).isoformat()}
169 if stripe_customer_id:
170 update_expr += ", stripe_customer_id = :customer_id"
171 expr_values[":customer_id"] = stripe_customer_id
173 if stripe_subscription_id:
174 update_expr += ", stripe_subscription_id = :subscription_id"
175 expr_values[":subscription_id"] = stripe_subscription_id
177 self.table.update_item(
178 Key={"user_id": user_id},
179 UpdateExpression=update_expr,
180 ExpressionAttributeValues=expr_values,
181 )
182 except ClientError as e:
183 logger.error(f"Error updating subscription state for {user_id}: {e}")
184 raise
186 def get_user_payment_info(self, user_id: str) -> dict[str, Any]:
187 """Get user's payment-related information."""
188 if not self.table:
189 raise RuntimeError(f"DynamoDB table {self.table_name} not accessible")
191 try:
192 response = self.table.get_item(Key={"user_id": user_id})
193 if "Item" in response:
194 item = response["Item"]
195 credits_val = item.get("credits", 0)
196 # DynamoDB returns Decimal for numbers - convert safely
197 credits_int = 0 if credits_val is None else int(Decimal(str(credits_val)))
198 return {
199 "credits": credits_int,
200 "stripe_customer_id": item.get("stripe_customer_id"),
201 "stripe_subscription_id": item.get("stripe_subscription_id"),
202 "subscription_status": item.get("subscription_status"),
203 }
204 return {
205 "credits": 0,
206 "stripe_customer_id": None,
207 "stripe_subscription_id": None,
208 "subscription_status": None,
209 }
210 except ClientError as e:
211 logger.error(f"Error getting payment info for {user_id}: {e}")
212 return {
213 "credits": 0,
214 "stripe_customer_id": None,
215 "stripe_subscription_id": None,
216 "subscription_status": None,
217 }
219 def has_unlimited_access(self, user_id: str) -> bool:
220 """Check if user has unlimited access via active subscription."""
221 info = self.get_user_payment_info(user_id)
222 return info.get("subscription_status") == "active"
224 def set_stripe_customer_id(self, user_id: str, stripe_customer_id: str) -> None:
225 """Store Stripe customer ID for a user."""
226 if not self.table:
227 raise RuntimeError(f"DynamoDB table {self.table_name} not accessible")
229 try:
230 self.table.update_item(
231 Key={"user_id": user_id},
232 UpdateExpression="SET stripe_customer_id = :customer_id, updated_at = :now",
233 ExpressionAttributeValues={
234 ":customer_id": stripe_customer_id,
235 ":now": datetime.now(UTC).isoformat(),
236 },
237 )
238 logger.info(f"Stored Stripe customer ID {stripe_customer_id} for user {user_id}")
239 except ClientError as e:
240 logger.error(f"Error storing Stripe customer ID for {user_id}: {e}")
241 raise
243 def idempotent_credit_grant(self, user_id: str, amount: int, payment_intent_id: str) -> bool:
244 """Grant credits only if payment_intent_id hasn't been processed before.
246 Uses DynamoDB conditional update with a processed_payments string set
247 to prevent double-crediting.
249 Returns True if credits were granted, False if already processed.
250 """
251 if not self.table:
252 raise RuntimeError(f"DynamoDB table {self.table_name} not accessible")
254 try:
255 self.table.update_item(
256 Key={"user_id": user_id},
257 UpdateExpression=(
258 "ADD credits :amount, processed_payments :pid_set SET updated_at = :now"
259 ),
260 ConditionExpression="NOT contains(processed_payments, :pid)",
261 ExpressionAttributeValues={
262 ":amount": Decimal(amount),
263 ":pid_set": {payment_intent_id},
264 ":pid": payment_intent_id,
265 ":now": datetime.now(UTC).isoformat(),
266 },
267 )
268 logger.info(f"Granted {amount} credits to {user_id} for payment {payment_intent_id}")
269 return True
270 except ClientError as e:
271 if e.response["Error"]["Code"] == "ConditionalCheckFailedException":
272 logger.info(f"Payment {payment_intent_id} already processed for {user_id}")
273 return False
274 logger.error(f"Error granting credits for {user_id}: {e}")
275 raise
277 def grant_first_card_bonus(self, user_id: str, bonus_amount: int = 700) -> dict[str, Any]:
278 """Grant one-time credit bonus on first card save.
280 Uses conditional update on first_card_bonus_granted flag to prevent
281 duplicate grants.
283 Returns dict with granted status, credits_added, and message.
284 """
285 if not self.table:
286 raise RuntimeError(f"DynamoDB table {self.table_name} not accessible")
288 try:
289 self.table.update_item(
290 Key={"user_id": user_id},
291 UpdateExpression=(
292 "SET first_card_bonus_granted = :true_val, updated_at = :now ADD credits :bonus"
293 ),
294 ConditionExpression=(
295 "attribute_not_exists(first_card_bonus_granted) "
296 "OR first_card_bonus_granted = :false_val"
297 ),
298 ExpressionAttributeValues={
299 ":true_val": True,
300 ":false_val": False,
301 ":bonus": Decimal(bonus_amount),
302 ":now": datetime.now(UTC).isoformat(),
303 },
304 )
305 logger.info(f"Granted {bonus_amount} first-card bonus to {user_id}")
306 return {
307 "granted": True,
308 "credits_added": bonus_amount,
309 "message": f"First card bonus of {bonus_amount} credits granted",
310 }
311 except ClientError as e:
312 if e.response["Error"]["Code"] == "ConditionalCheckFailedException":
313 logger.info(f"First card bonus already granted for {user_id}")
314 return {
315 "granted": False,
316 "credits_added": 0,
317 "message": "First card bonus already granted",
318 }
319 logger.error(f"Error granting first card bonus for {user_id}: {e}")
320 raise
322 def check_and_deduct(self, user_id: str, amount: int) -> dict[str, Any]:
323 """Check unlimited access, then check balance, then deduct credits.
325 Returns dict with deducted status, amount, unlimited flag, and balance.
326 Raises InsufficientCreditsError if balance is insufficient.
327 """
328 if self.has_unlimited_access(user_id):
329 return {"deducted": False, "amount": 0, "unlimited": True, "balance": 0}
331 balance = self.get_balance(user_id)
332 if balance < amount:
333 raise InsufficientCreditsError(credits_required=amount, credits_available=balance)
335 success = self.deduct_credits(user_id, amount)
336 if not success:
337 # Race condition: balance changed between check and deduct
338 balance = self.get_balance(user_id)
339 raise InsufficientCreditsError(credits_required=amount, credits_available=balance)
341 new_balance = self.get_balance(user_id)
342 return {
343 "deducted": True,
344 "amount": amount,
345 "unlimited": False,
346 "balance": new_balance,
347 }