Coverage for src / augint_library / telemetry.py: 89%

267 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-30 20:22 +0000

1"""Telemetry module for anonymous usage tracking and error reporting. 

2 

3This module provides opt-in telemetry functionality for gathering anonymous 

4usage statistics and error reports to improve the library. All telemetry is 

5privacy-conscious and requires explicit user consent. 

6 

7Privacy guarantees: 

8- No personal information is collected 

9- All file paths and hostnames are scrubbed 

10- Users must explicitly opt-in 

11- Telemetry can be disabled at any time 

12""" 

13 

14import contextlib 

15import json 

16import logging 

17import os 

18import platform 

19import uuid 

20from datetime import datetime, timezone 

21from functools import wraps 

22from importlib.metadata import version 

23from pathlib import Path 

24from typing import Any, Callable, Optional 

25 

26from .constants import DEFAULT_TELEMETRY_FLUSH_TIMEOUT, JSON_INDENT, TELEMETRY_SAMPLE_RATE 

27 

28try: 

29 import sentry_sdk 

30 from sentry_sdk.integrations.logging import LoggingIntegration 

31 from sentry_sdk.types import Event, Hint 

32 

33 SENTRY_AVAILABLE = True 

34except ImportError: 

35 SENTRY_AVAILABLE = False 

36 

37 # Import types module to create a proper module type 

38 from types import ModuleType 

39 

40 # Dummy LoggingIntegration when sentry is not available 

41 class LoggingIntegration: # type: ignore[no-redef] 

42 """Dummy LoggingIntegration for tests.""" 

43 

44 def __init__(self, **kwargs: Any) -> None: 

45 pass 

46 

47 # Create a module-like object that mypy will accept 

48 class _DummySentryModule(ModuleType): 

49 """Dummy sentry_sdk module when the real module is not available.""" 

50 

51 def __init__(self) -> None: 

52 super().__init__("sentry_sdk") 

53 self.metrics = self._Metrics() 

54 

55 def init(self, *args: Any, **kwargs: Any) -> None: 

56 pass 

57 

58 def set_user(self, *args: Any, **kwargs: Any) -> None: 

59 pass 

60 

61 def set_tag(self, *args: Any, **kwargs: Any) -> None: 

62 pass 

63 

64 def capture_message(self, *args: Any, **kwargs: Any) -> None: 

65 pass 

66 

67 def capture_exception(self, *args: Any, **kwargs: Any) -> None: 

68 pass 

69 

70 def flush(self, *args: Any, **kwargs: Any) -> None: 

71 pass 

72 

73 @contextlib.contextmanager 

74 def push_scope(self) -> Any: 

75 """Dummy push_scope context manager.""" 

76 

77 class DummyScope: 

78 def set_extra(self, *args: Any, **kwargs: Any) -> None: 

79 pass 

80 

81 yield DummyScope() 

82 

83 class _Metrics: 

84 def incr(self, *args: Any, **kwargs: Any) -> None: 

85 pass 

86 

87 def distribution(self, *args: Any, **kwargs: Any) -> None: 

88 pass 

89 

90 sentry_sdk = _DummySentryModule() 

91 

92 # Define dummy types when sentry is not available 

93 class Event(dict): # type: ignore[no-redef,type-arg] 

94 """Dummy Event type when sentry is not available.""" 

95 

96 class Hint(dict): # type: ignore[no-redef,type-arg] 

97 """Dummy Hint type when sentry is not available.""" 

98 

99 

100logger = logging.getLogger(__name__) 

101 

102 

103class TelemetryClient: 

104 """Privacy-conscious telemetry client for community usage tracking.""" 

105 

106 def __init__(self) -> None: 

107 """Initialize telemetry client, checking consent and configuration.""" 

108 self._enabled = False 

109 self._initialized = False 

110 self._anonymous_id: Optional[str] = None 

111 self._package_name = self._get_package_name() 

112 self._consent_file = Path.home() / f".{self._package_name}" / "consent.json" 

113 

114 # Check if telemetry should be enabled 

115 if self._should_enable_telemetry(): 

116 self._initialize_sentry() 

117 

118 def _get_package_name(self) -> str: 

119 """Get the package name dynamically.""" 

120 if __package__: 120 ↛ 124line 120 didn't jump to line 124 because the condition on line 120 was always true

121 # Extract root package name from module path 

122 return __package__.split(".")[0].replace("_", "-") 

123 # Fallback to extracting from module path 

124 return Path(__file__).parent.name.replace("_", "-") 

125 

126 def _should_enable_telemetry(self) -> bool: 

127 """Check if telemetry should be enabled based on consent and environment.""" 

128 # Environment variable override (highest priority) 

129 env_var_name = f"{self._package_name.replace('-', '_').upper()}_TELEMETRY_ENABLED" 

130 env_enabled = os.getenv(env_var_name, "").lower() 

131 if env_enabled == "false": 

132 return False 

133 if env_enabled == "true": 

134 return True 

135 

136 # CI environment detection (disable in CI) 

137 if self._is_ci_environment(): 

138 return False 

139 

140 # Check stored consent 

141 return self._check_stored_consent() 

142 

143 def _is_ci_environment(self) -> bool: 

144 """Detect if running in a CI environment.""" 

145 ci_env_vars = [ 

146 "CI", 

147 "CONTINUOUS_INTEGRATION", 

148 "GITHUB_ACTIONS", 

149 "GITLAB_CI", 

150 "JENKINS_URL", 

151 "TRAVIS", 

152 ] 

153 return any(os.getenv(var) for var in ci_env_vars) 

154 

155 def _check_stored_consent(self) -> bool: 

156 """Check if user has previously consented to telemetry.""" 

157 if not self._consent_file.exists(): 

158 return False 

159 

160 try: 

161 with self._consent_file.open() as f: 

162 consent_data = json.load(f) 

163 return bool(consent_data.get("telemetry_enabled", False)) 

164 except (OSError, json.JSONDecodeError, KeyError): 

165 return False 

166 

167 def _get_anonymous_id(self) -> str: 

168 """Get or create anonymous user ID.""" 

169 if self._anonymous_id: 

170 return self._anonymous_id 

171 

172 consent_data = {} 

173 if self._consent_file.exists(): 

174 try: 

175 with self._consent_file.open() as f: 

176 consent_data = json.load(f) 

177 except (OSError, json.JSONDecodeError): 

178 pass 

179 

180 # Use existing ID or create new one 

181 self._anonymous_id = consent_data.get("anonymous_id") or str(uuid.uuid4()) 

182 return self._anonymous_id 

183 

184 def _initialize_sentry(self) -> None: 

185 """Initialize Sentry SDK with privacy-conscious settings.""" 

186 # Use helper to allow test mocking of SENTRY_AVAILABLE 

187 if not _is_sentry_available(): 

188 return 

189 

190 # Get DSN from environment 

191 dsn = os.getenv("SENTRY_DSN") 

192 if not dsn: 192 ↛ 193line 192 didn't jump to line 193 because the condition on line 192 was never true

193 return 

194 

195 try: 

196 # Get package name dynamically from module 

197 package_name = __package__.split(".")[0].replace("_", "-") 

198 

199 # Try to get version, fallback to 'unknown' for test environments 

200 try: 

201 pkg_version = version(package_name) 

202 release = f"{package_name}@{pkg_version}" 

203 except Exception: 

204 # In test environments, the package might not be installed 

205 release = f"{package_name}@unknown" 

206 

207 sentry_sdk.init( 

208 dsn=dsn, 

209 # Scrub sensitive data 

210 before_send=self._scrub_sensitive_data, 

211 before_send_transaction=self._scrub_sensitive_data, 

212 # Sample rate for performance monitoring 

213 traces_sample_rate=TELEMETRY_SAMPLE_RATE, 

214 # Release tracking 

215 release=release, 

216 # Environment 

217 environment=os.getenv( 

218 f"{self._package_name.replace('-', '_').upper()}_ENVIRONMENT", "production" 

219 ), 

220 # Disable automatic user tracking 

221 send_default_pii=False, 

222 # Integrations 

223 integrations=[ 

224 LoggingIntegration( 

225 level=None, # Don't capture logs 

226 event_level=None, 

227 ), 

228 ], 

229 # Don't attach stack locals (might contain sensitive data) 

230 attach_stacktrace=False, 

231 ) 

232 

233 # Set anonymous user context 

234 sentry_sdk.set_user({"id": self._get_anonymous_id()}) 

235 

236 # Set global tags 

237 sentry_sdk.set_tag("python_version", platform.python_version()) 

238 sentry_sdk.set_tag("platform", platform.system()) 

239 sentry_sdk.set_tag("platform_version", platform.version()) 

240 

241 self._initialized = True 

242 self._enabled = True 

243 

244 except Exception: 

245 # Silently fail if Sentry initialization fails 

246 logger.debug("Failed to initialize Sentry", exc_info=True) 

247 

248 def _scrub_sensitive_data( 

249 self, 

250 event: Event, 

251 hint: Hint, # noqa: ARG002 

252 ) -> Optional[Event]: 

253 """Scrub sensitive data from Sentry events.""" 

254 if not event: 254 ↛ 255line 254 didn't jump to line 255 because the condition on line 254 was never true

255 return None 

256 

257 # Work directly with event as it's dict-like 

258 # Scrub file paths in stack traces 

259 self._scrub_stacktraces(event) 

260 

261 # Scrub breadcrumbs 

262 self._scrub_breadcrumbs(event) 

263 

264 # Scrub extra context 

265 if "extra" in event and isinstance(event["extra"], dict): 

266 event["extra"] = self._scrub_dict(event["extra"]) 

267 

268 # Scrub user data (though we shouldn't have any) 

269 if "user" in event: 

270 event["user"] = {"id": self._get_anonymous_id()} 

271 

272 # Remove request data and server name 

273 event.pop("request", None) 

274 event.pop("server_name", None) 

275 

276 return event 

277 

278 def _scrub_stacktraces(self, event_dict: Event) -> None: 

279 """Scrub sensitive data from stack traces.""" 

280 if "exception" not in event_dict: 

281 return 

282 

283 exceptions = event_dict.get("exception", {}) 

284 if not isinstance(exceptions, dict) or "values" not in exceptions: 

285 return 

286 

287 for exception in exceptions.get("values", []): 

288 stacktrace = exception.get("stacktrace", {}) 

289 if isinstance(stacktrace, dict) and "frames" in stacktrace: 289 ↛ 287line 289 didn't jump to line 287 because the condition on line 289 was always true

290 for frame in stacktrace.get("frames", []): 

291 if "filename" in frame: 291 ↛ 293line 291 didn't jump to line 293 because the condition on line 291 was always true

292 frame["filename"] = self._anonymize_path(frame["filename"]) 

293 frame.pop("vars", None) 

294 

295 def _scrub_breadcrumbs(self, event_dict: Event) -> None: 

296 """Scrub sensitive data from breadcrumbs.""" 

297 if "breadcrumbs" not in event_dict: 

298 return 

299 

300 breadcrumbs = event_dict.get("breadcrumbs", {}) 

301 if isinstance(breadcrumbs, dict) and "values" in breadcrumbs: 

302 for crumb in breadcrumbs.get("values", []): 

303 if "data" in crumb and isinstance(crumb["data"], dict): 

304 crumb["data"] = self._scrub_dict(crumb["data"]) 

305 

306 def _anonymize_path(self, filepath: str) -> str: 

307 """Anonymize file paths to remove user-specific information.""" 

308 # Convert to Path for easier manipulation 

309 path = Path(filepath) 

310 

311 # If it's within site-packages, show relative path 

312 for part in path.parts: 

313 if "site-packages" in part: 

314 idx = path.parts.index(part) 

315 return str(Path(*path.parts[idx + 1 :])) 

316 

317 # If it contains the package module, show path from there 

318 module_name = self._package_name.replace("-", "_") 

319 for i, part in enumerate(path.parts): 

320 if part == module_name: 

321 return str(Path(*path.parts[i:])) 

322 

323 # Otherwise just show the filename 

324 return path.name 

325 

326 def _scrub_dict(self, data: dict[str, Any]) -> dict[str, Any]: 

327 """Recursively scrub sensitive information from dictionaries.""" 

328 scrubbed: dict[str, Any] = {} 

329 sensitive_keys = { 

330 "password", 

331 "token", 

332 "key", 

333 "secret", 

334 "credential", 

335 "auth", 

336 "api_key", 

337 "access_token", 

338 "private", 

339 "path", 

340 "file", 

341 "dir", 

342 "directory", 

343 "home", 

344 "user", 

345 "username", 

346 "email", 

347 "host", 

348 "hostname", 

349 "ip", 

350 "address", 

351 } 

352 

353 for key, value in data.items(): 

354 # Check if key contains sensitive terms 

355 if any(term in key.lower() for term in sensitive_keys): 

356 scrubbed[key] = "[REDACTED]" 

357 elif isinstance(value, dict): 

358 scrubbed[key] = self._scrub_dict(value) 

359 elif isinstance(value, str): 

360 # Scrub file paths 

361 if "/" in value or "\\" in value: 361 ↛ 362line 361 didn't jump to line 362 because the condition on line 361 was never true

362 scrubbed[key] = "[PATH]" 

363 else: 

364 scrubbed[key] = value 

365 else: 

366 scrubbed[key] = value 

367 

368 return scrubbed 

369 

370 @property 

371 def enabled(self) -> bool: 

372 """Check if telemetry is enabled.""" 

373 return self._enabled 

374 

375 def set_consent(self, enabled: bool, prompt_shown: bool = True) -> None: 

376 """Set telemetry consent and persist it.""" 

377 # Create config directory if it doesn't exist 

378 self._consent_file.parent.mkdir(parents=True, exist_ok=True) 

379 

380 # Load existing data or create new 

381 consent_data = {} 

382 if self._consent_file.exists(): 

383 try: 

384 with self._consent_file.open() as f: 

385 consent_data = json.load(f) 

386 except (OSError, json.JSONDecodeError): 

387 pass 

388 

389 # Update consent data 

390 consent_data.update( 

391 { 

392 "telemetry_enabled": enabled, 

393 "anonymous_id": self._get_anonymous_id(), 

394 "consent_timestamp": datetime.now(timezone.utc).isoformat(), 

395 "prompt_shown": prompt_shown, 

396 } 

397 ) 

398 

399 # Write consent file 

400 with self._consent_file.open("w") as f: 

401 json.dump(consent_data, f, indent=JSON_INDENT) 

402 

403 # Update internal state 

404 self._enabled = enabled 

405 if enabled and not self._initialized: 

406 self._initialize_sentry() 

407 elif not enabled and self._initialized: 407 ↛ 409line 407 didn't jump to line 409 because the condition on line 407 was never true

408 # Disable Sentry 

409 if _is_sentry_available(): 

410 sentry_sdk.init(dsn=None) 

411 self._initialized = False 

412 

413 def track_command( 

414 self, command: str, success: bool = True, duration: Optional[float] = None 

415 ) -> None: 

416 """Track CLI command execution.""" 

417 if not self._enabled or not _is_sentry_available(): 

418 return 

419 

420 try: 

421 # Track as custom event 

422 sentry_sdk.capture_message( 

423 f"Command executed: {command}", 

424 level="info", 

425 extras={ 

426 "command": command, 

427 "success": success, 

428 "duration": duration, 

429 }, 

430 ) 

431 

432 # Also track as metric 

433 tags = { 

434 "command": command, 

435 "success": str(success), 

436 } 

437 

438 # Increment counter 

439 sentry_sdk.metrics.incr( 

440 key="cli.command.executed", 

441 value=1, 

442 tags=tags, 

443 ) 

444 

445 # Track duration if provided 

446 if duration is not None: 

447 sentry_sdk.metrics.distribution( 

448 key="cli.command.duration", 

449 value=duration, 

450 unit="second", 

451 tags=tags, 

452 ) 

453 

454 except Exception: 

455 # Never let telemetry break the application 

456 logger.debug("Failed to track telemetry", exc_info=True) 

457 

458 def track_error(self, error: Exception, context: Optional[dict[str, Any]] = None) -> None: 

459 """Track errors with context.""" 

460 if not self._enabled or not _is_sentry_available(): 

461 return 

462 

463 try: 

464 # Add context if provided 

465 if context: 

466 scrubbed_context = self._scrub_dict(context) 

467 with sentry_sdk.push_scope() as scope: 

468 for key, value in scrubbed_context.items(): 

469 scope.set_extra(key, value) 

470 sentry_sdk.capture_exception(error) 

471 else: 

472 sentry_sdk.capture_exception(error) 

473 

474 except Exception: 

475 # Never let telemetry break the application 

476 logger.debug("Failed to track telemetry", exc_info=True) 

477 

478 def track_metric( 

479 self, 

480 name: str, 

481 value: float, 

482 unit: Optional[str] = None, 

483 tags: Optional[dict[str, str]] = None, 

484 ) -> None: 

485 """Track custom metrics.""" 

486 if not self._enabled or not _is_sentry_available(): 

487 return 

488 

489 try: 

490 if unit is not None: 

491 sentry_sdk.metrics.distribution( 

492 key=f"custom.{name}", 

493 value=value, 

494 unit=unit, 

495 tags=tags or {}, 

496 ) 

497 else: 

498 sentry_sdk.metrics.distribution( 

499 key=f"custom.{name}", 

500 value=value, 

501 tags=tags or {}, 

502 ) 

503 except Exception: 

504 # Never let telemetry break the application 

505 logger.debug("Failed to track telemetry", exc_info=True) 

506 

507 def flush(self, timeout: float = DEFAULT_TELEMETRY_FLUSH_TIMEOUT) -> None: 

508 """Flush pending telemetry events.""" 

509 if self._enabled and _is_sentry_available(): 

510 with contextlib.suppress(Exception): 

511 sentry_sdk.flush(timeout=timeout) 

512 

513 

514# Global telemetry instance holder 

515class _TelemetryClientHolder: 

516 client: Optional[TelemetryClient] = None 

517 

518 

519# Test helper to set SENTRY_AVAILABLE 

520def _set_sentry_available(value: bool) -> None: 

521 """Set SENTRY_AVAILABLE for testing. Not part of public API.""" 

522 globals()["SENTRY_AVAILABLE"] = value 

523 

524 

525# Helper to check SENTRY_AVAILABLE from within module 

526def _is_sentry_available() -> bool: 

527 """Check if sentry is available. Used internally to allow test mocking.""" 

528 return bool(globals().get("SENTRY_AVAILABLE", False)) 

529 

530 

531def get_telemetry_client() -> TelemetryClient: 

532 """Get or create the global telemetry client.""" 

533 if _TelemetryClientHolder.client is None: 

534 _TelemetryClientHolder.client = TelemetryClient() 

535 return _TelemetryClientHolder.client 

536 

537 

538def track_command_execution(func: Callable[..., Any]) -> Callable[..., Any]: 

539 """Decorator to automatically track command execution.""" 

540 

541 @wraps(func) 

542 def wrapper(*args: Any, **kwargs: Any) -> Any: 

543 client = get_telemetry_client() 

544 command_name = func.__name__ 

545 start_time = datetime.now(timezone.utc) 

546 

547 try: 

548 result = func(*args, **kwargs) 

549 except Exception as e: 

550 # Track failed execution 

551 duration = (datetime.now(timezone.utc) - start_time).total_seconds() 

552 client.track_command(command_name, success=False, duration=duration) 

553 client.track_error(e, {"command": command_name}) 

554 raise 

555 else: 

556 # Track successful execution 

557 duration = (datetime.now(timezone.utc) - start_time).total_seconds() 

558 client.track_command(command_name, success=True, duration=duration) 

559 return result 

560 

561 return wrapper