Coverage for src / ai_lls_lib / payment / credit_manager.py: 90%

164 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-06 23:45 +0000

1"""Credit balance management with DynamoDB.""" 

2 

3import logging 

4import os 

5from datetime import UTC, datetime 

6from decimal import Decimal 

7from typing import TYPE_CHECKING, Any 

8 

9try: 

10 import boto3 

11 from botocore.exceptions import ClientError 

12 

13 HAS_BOTO3 = True 

14except ImportError: 

15 boto3 = None # type: ignore[assignment] 

16 HAS_BOTO3 = False 

17 

18if TYPE_CHECKING: 

19 from mypy_boto3_dynamodb.service_resource import Table 

20 

21logger = logging.getLogger(__name__) 

22 

23 

24class InsufficientCreditsError(Exception): 

25 """Raised when user has insufficient credits for an operation.""" 

26 

27 def __init__(self, credits_required: int, credits_available: int): 

28 self.credits_required = credits_required 

29 self.credits_available = credits_available 

30 super().__init__( 

31 f"Insufficient credits: required {credits_required}, available {credits_available}" 

32 ) 

33 

34 

35class CreditManager: 

36 """ 

37 Manages user credit balances in DynamoDB CreditsTable. 

38 """ 

39 

40 table: "Table | None" 

41 

42 def __init__(self, table_name: str | None = None): 

43 """Initialize with DynamoDB table.""" 

44 if not HAS_BOTO3 or not boto3: 

45 raise RuntimeError("boto3 is required for CreditManager") 

46 

47 self.dynamodb = boto3.resource("dynamodb") 

48 self.table_name = table_name if table_name else os.environ["CREDITS_TABLE"] 

49 

50 try: 

51 self.table = self.dynamodb.Table(self.table_name) 

52 except Exception as e: 

53 logger.error(f"Failed to connect to DynamoDB table {self.table_name}: {e}") 

54 self.table = None 

55 

56 def get_balance(self, user_id: str) -> int: 

57 """Get current credit balance for a user.""" 

58 if not self.table: 

59 raise RuntimeError(f"DynamoDB table {self.table_name} not accessible") 

60 

61 try: 

62 response = self.table.get_item(Key={"user_id": user_id}) 

63 if "Item" in response: 

64 credits_val = response["Item"].get("credits", 0) 

65 # DynamoDB returns Decimal for numbers - convert safely 

66 if credits_val is None: 

67 return 0 

68 return int(Decimal(str(credits_val))) 

69 return 0 

70 except ClientError as e: 

71 logger.error(f"Error getting balance for {user_id}: {e}") 

72 return 0 

73 

74 def add_credits(self, user_id: str, amount: int) -> int: 

75 """Add credits to user balance and return new balance.""" 

76 if not self.table: 

77 raise RuntimeError(f"DynamoDB table {self.table_name} not accessible") 

78 

79 try: 

80 response = self.table.update_item( 

81 Key={"user_id": user_id}, 

82 UpdateExpression="ADD credits :amount SET updated_at = :now", 

83 ExpressionAttributeValues={ 

84 ":amount": Decimal(amount), 

85 ":now": datetime.now(UTC).isoformat(), 

86 }, 

87 ReturnValues="ALL_NEW", 

88 ) 

89 attrs = response.get("Attributes", {}) 

90 credits_val = attrs.get("credits", 0) 

91 # DynamoDB returns Decimal for numbers - convert safely 

92 if credits_val is None: 

93 return 0 

94 return int(Decimal(str(credits_val))) 

95 except ClientError as e: 

96 logger.error(f"Error adding credits for {user_id}: {e}") 

97 raise 

98 

99 def deduct_credits(self, user_id: str, amount: int) -> bool: 

100 """ 

101 Deduct credits from user balance. 

102 Returns True if successful, False if insufficient balance. 

103 """ 

104 if not self.table: 

105 raise RuntimeError(f"DynamoDB table {self.table_name} not accessible") 

106 

107 try: 

108 # Conditional update - only deduct if balance >= amount 

109 self.table.update_item( 

110 Key={"user_id": user_id}, 

111 UpdateExpression="ADD credits :negative_amount SET updated_at = :now", 

112 ConditionExpression="credits >= :amount", 

113 ExpressionAttributeValues={ 

114 ":negative_amount": Decimal(-amount), 

115 ":amount": Decimal(amount), 

116 ":now": datetime.now(UTC).isoformat(), 

117 }, 

118 ) 

119 return True 

120 except ClientError as e: 

121 if e.response["Error"]["Code"] == "ConditionalCheckFailedException": 

122 logger.info(f"Insufficient credits for {user_id}") 

123 return False 

124 logger.error(f"Error deducting credits for {user_id}: {e}") 

125 raise 

126 

127 def set_credits(self, user_id: str, credits: int, reason: str | None = None) -> int: 

128 """Set user credit balance to an exact value and return new balance.""" 

129 if not self.table: 

130 raise RuntimeError(f"DynamoDB table {self.table_name} not accessible") 

131 

132 try: 

133 if reason: 

134 logger.info(f"Setting credits for {user_id} to {credits}: {reason}") 

135 

136 response = self.table.update_item( 

137 Key={"user_id": user_id}, 

138 UpdateExpression="SET credits = :credits, updated_at = :now", 

139 ExpressionAttributeValues={ 

140 ":credits": Decimal(credits), 

141 ":now": datetime.now(UTC).isoformat(), 

142 }, 

143 ReturnValues="ALL_NEW", 

144 ) 

145 attrs = response.get("Attributes", {}) 

146 credits_val = attrs.get("credits", 0) 

147 if credits_val is None: 

148 return 0 

149 return int(Decimal(str(credits_val))) 

150 except ClientError as e: 

151 logger.error(f"Error setting credits for {user_id}: {e}") 

152 raise 

153 

154 def set_subscription_state( 

155 self, 

156 user_id: str, 

157 status: str, 

158 stripe_customer_id: str | None = None, 

159 stripe_subscription_id: str | None = None, 

160 ) -> None: 

161 """Update subscription state in CreditsTable.""" 

162 if not self.table: 

163 raise RuntimeError(f"DynamoDB table {self.table_name} not accessible") 

164 

165 try: 

166 update_expr = "SET subscription_status = :status, updated_at = :now" 

167 expr_values = {":status": status, ":now": datetime.now(UTC).isoformat()} 

168 

169 if stripe_customer_id: 

170 update_expr += ", stripe_customer_id = :customer_id" 

171 expr_values[":customer_id"] = stripe_customer_id 

172 

173 if stripe_subscription_id: 

174 update_expr += ", stripe_subscription_id = :subscription_id" 

175 expr_values[":subscription_id"] = stripe_subscription_id 

176 

177 self.table.update_item( 

178 Key={"user_id": user_id}, 

179 UpdateExpression=update_expr, 

180 ExpressionAttributeValues=expr_values, 

181 ) 

182 except ClientError as e: 

183 logger.error(f"Error updating subscription state for {user_id}: {e}") 

184 raise 

185 

186 def get_user_payment_info(self, user_id: str) -> dict[str, Any]: 

187 """Get user's payment-related information.""" 

188 if not self.table: 

189 raise RuntimeError(f"DynamoDB table {self.table_name} not accessible") 

190 

191 try: 

192 response = self.table.get_item(Key={"user_id": user_id}) 

193 if "Item" in response: 

194 item = response["Item"] 

195 credits_val = item.get("credits", 0) 

196 # DynamoDB returns Decimal for numbers - convert safely 

197 credits_int = 0 if credits_val is None else int(Decimal(str(credits_val))) 

198 return { 

199 "credits": credits_int, 

200 "stripe_customer_id": item.get("stripe_customer_id"), 

201 "stripe_subscription_id": item.get("stripe_subscription_id"), 

202 "subscription_status": item.get("subscription_status"), 

203 } 

204 return { 

205 "credits": 0, 

206 "stripe_customer_id": None, 

207 "stripe_subscription_id": None, 

208 "subscription_status": None, 

209 } 

210 except ClientError as e: 

211 logger.error(f"Error getting payment info for {user_id}: {e}") 

212 return { 

213 "credits": 0, 

214 "stripe_customer_id": None, 

215 "stripe_subscription_id": None, 

216 "subscription_status": None, 

217 } 

218 

219 def has_unlimited_access(self, user_id: str) -> bool: 

220 """Check if user has unlimited access via active subscription.""" 

221 info = self.get_user_payment_info(user_id) 

222 return info.get("subscription_status") == "active" 

223 

224 def set_stripe_customer_id(self, user_id: str, stripe_customer_id: str) -> None: 

225 """Store Stripe customer ID for a user.""" 

226 if not self.table: 

227 raise RuntimeError(f"DynamoDB table {self.table_name} not accessible") 

228 

229 try: 

230 self.table.update_item( 

231 Key={"user_id": user_id}, 

232 UpdateExpression="SET stripe_customer_id = :customer_id, updated_at = :now", 

233 ExpressionAttributeValues={ 

234 ":customer_id": stripe_customer_id, 

235 ":now": datetime.now(UTC).isoformat(), 

236 }, 

237 ) 

238 logger.info(f"Stored Stripe customer ID {stripe_customer_id} for user {user_id}") 

239 except ClientError as e: 

240 logger.error(f"Error storing Stripe customer ID for {user_id}: {e}") 

241 raise 

242 

243 def idempotent_credit_grant(self, user_id: str, amount: int, payment_intent_id: str) -> bool: 

244 """Grant credits only if payment_intent_id hasn't been processed before. 

245 

246 Uses DynamoDB conditional update with a processed_payments string set 

247 to prevent double-crediting. 

248 

249 Returns True if credits were granted, False if already processed. 

250 """ 

251 if not self.table: 

252 raise RuntimeError(f"DynamoDB table {self.table_name} not accessible") 

253 

254 try: 

255 self.table.update_item( 

256 Key={"user_id": user_id}, 

257 UpdateExpression=( 

258 "ADD credits :amount, processed_payments :pid_set SET updated_at = :now" 

259 ), 

260 ConditionExpression="NOT contains(processed_payments, :pid)", 

261 ExpressionAttributeValues={ 

262 ":amount": Decimal(amount), 

263 ":pid_set": {payment_intent_id}, 

264 ":pid": payment_intent_id, 

265 ":now": datetime.now(UTC).isoformat(), 

266 }, 

267 ) 

268 logger.info(f"Granted {amount} credits to {user_id} for payment {payment_intent_id}") 

269 return True 

270 except ClientError as e: 

271 if e.response["Error"]["Code"] == "ConditionalCheckFailedException": 

272 logger.info(f"Payment {payment_intent_id} already processed for {user_id}") 

273 return False 

274 logger.error(f"Error granting credits for {user_id}: {e}") 

275 raise 

276 

277 def grant_first_card_bonus(self, user_id: str, bonus_amount: int = 700) -> dict[str, Any]: 

278 """Grant one-time credit bonus on first card save. 

279 

280 Uses conditional update on first_card_bonus_granted flag to prevent 

281 duplicate grants. 

282 

283 Returns dict with granted status, credits_added, and message. 

284 """ 

285 if not self.table: 

286 raise RuntimeError(f"DynamoDB table {self.table_name} not accessible") 

287 

288 try: 

289 self.table.update_item( 

290 Key={"user_id": user_id}, 

291 UpdateExpression=( 

292 "SET first_card_bonus_granted = :true_val, updated_at = :now ADD credits :bonus" 

293 ), 

294 ConditionExpression=( 

295 "attribute_not_exists(first_card_bonus_granted) " 

296 "OR first_card_bonus_granted = :false_val" 

297 ), 

298 ExpressionAttributeValues={ 

299 ":true_val": True, 

300 ":false_val": False, 

301 ":bonus": Decimal(bonus_amount), 

302 ":now": datetime.now(UTC).isoformat(), 

303 }, 

304 ) 

305 logger.info(f"Granted {bonus_amount} first-card bonus to {user_id}") 

306 return { 

307 "granted": True, 

308 "credits_added": bonus_amount, 

309 "message": f"First card bonus of {bonus_amount} credits granted", 

310 } 

311 except ClientError as e: 

312 if e.response["Error"]["Code"] == "ConditionalCheckFailedException": 

313 logger.info(f"First card bonus already granted for {user_id}") 

314 return { 

315 "granted": False, 

316 "credits_added": 0, 

317 "message": "First card bonus already granted", 

318 } 

319 logger.error(f"Error granting first card bonus for {user_id}: {e}") 

320 raise 

321 

322 def check_and_deduct(self, user_id: str, amount: int) -> dict[str, Any]: 

323 """Check unlimited access, then check balance, then deduct credits. 

324 

325 Returns dict with deducted status, amount, unlimited flag, and balance. 

326 Raises InsufficientCreditsError if balance is insufficient. 

327 """ 

328 if self.has_unlimited_access(user_id): 

329 return {"deducted": False, "amount": 0, "unlimited": True, "balance": 0} 

330 

331 balance = self.get_balance(user_id) 

332 if balance < amount: 

333 raise InsufficientCreditsError(credits_required=amount, credits_available=balance) 

334 

335 success = self.deduct_credits(user_id, amount) 

336 if not success: 

337 # Race condition: balance changed between check and deduct 

338 balance = self.get_balance(user_id) 

339 raise InsufficientCreditsError(credits_required=amount, credits_available=balance) 

340 

341 new_balance = self.get_balance(user_id) 

342 return { 

343 "deducted": True, 

344 "amount": amount, 

345 "unlimited": False, 

346 "balance": new_balance, 

347 }