Coverage for src / augint_library / telemetry.py: 89%
267 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-30 20:22 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-30 20:22 +0000
1"""Telemetry module for anonymous usage tracking and error reporting.
3This module provides opt-in telemetry functionality for gathering anonymous
4usage statistics and error reports to improve the library. All telemetry is
5privacy-conscious and requires explicit user consent.
7Privacy guarantees:
8- No personal information is collected
9- All file paths and hostnames are scrubbed
10- Users must explicitly opt-in
11- Telemetry can be disabled at any time
12"""
14import contextlib
15import json
16import logging
17import os
18import platform
19import uuid
20from datetime import datetime, timezone
21from functools import wraps
22from importlib.metadata import version
23from pathlib import Path
24from typing import Any, Callable, Optional
26from .constants import DEFAULT_TELEMETRY_FLUSH_TIMEOUT, JSON_INDENT, TELEMETRY_SAMPLE_RATE
28try:
29 import sentry_sdk
30 from sentry_sdk.integrations.logging import LoggingIntegration
31 from sentry_sdk.types import Event, Hint
33 SENTRY_AVAILABLE = True
34except ImportError:
35 SENTRY_AVAILABLE = False
37 # Import types module to create a proper module type
38 from types import ModuleType
40 # Dummy LoggingIntegration when sentry is not available
41 class LoggingIntegration: # type: ignore[no-redef]
42 """Dummy LoggingIntegration for tests."""
44 def __init__(self, **kwargs: Any) -> None:
45 pass
47 # Create a module-like object that mypy will accept
48 class _DummySentryModule(ModuleType):
49 """Dummy sentry_sdk module when the real module is not available."""
51 def __init__(self) -> None:
52 super().__init__("sentry_sdk")
53 self.metrics = self._Metrics()
55 def init(self, *args: Any, **kwargs: Any) -> None:
56 pass
58 def set_user(self, *args: Any, **kwargs: Any) -> None:
59 pass
61 def set_tag(self, *args: Any, **kwargs: Any) -> None:
62 pass
64 def capture_message(self, *args: Any, **kwargs: Any) -> None:
65 pass
67 def capture_exception(self, *args: Any, **kwargs: Any) -> None:
68 pass
70 def flush(self, *args: Any, **kwargs: Any) -> None:
71 pass
73 @contextlib.contextmanager
74 def push_scope(self) -> Any:
75 """Dummy push_scope context manager."""
77 class DummyScope:
78 def set_extra(self, *args: Any, **kwargs: Any) -> None:
79 pass
81 yield DummyScope()
83 class _Metrics:
84 def incr(self, *args: Any, **kwargs: Any) -> None:
85 pass
87 def distribution(self, *args: Any, **kwargs: Any) -> None:
88 pass
90 sentry_sdk = _DummySentryModule()
92 # Define dummy types when sentry is not available
93 class Event(dict): # type: ignore[no-redef,type-arg]
94 """Dummy Event type when sentry is not available."""
96 class Hint(dict): # type: ignore[no-redef,type-arg]
97 """Dummy Hint type when sentry is not available."""
100logger = logging.getLogger(__name__)
103class TelemetryClient:
104 """Privacy-conscious telemetry client for community usage tracking."""
106 def __init__(self) -> None:
107 """Initialize telemetry client, checking consent and configuration."""
108 self._enabled = False
109 self._initialized = False
110 self._anonymous_id: Optional[str] = None
111 self._package_name = self._get_package_name()
112 self._consent_file = Path.home() / f".{self._package_name}" / "consent.json"
114 # Check if telemetry should be enabled
115 if self._should_enable_telemetry():
116 self._initialize_sentry()
118 def _get_package_name(self) -> str:
119 """Get the package name dynamically."""
120 if __package__: 120 ↛ 124line 120 didn't jump to line 124 because the condition on line 120 was always true
121 # Extract root package name from module path
122 return __package__.split(".")[0].replace("_", "-")
123 # Fallback to extracting from module path
124 return Path(__file__).parent.name.replace("_", "-")
126 def _should_enable_telemetry(self) -> bool:
127 """Check if telemetry should be enabled based on consent and environment."""
128 # Environment variable override (highest priority)
129 env_var_name = f"{self._package_name.replace('-', '_').upper()}_TELEMETRY_ENABLED"
130 env_enabled = os.getenv(env_var_name, "").lower()
131 if env_enabled == "false":
132 return False
133 if env_enabled == "true":
134 return True
136 # CI environment detection (disable in CI)
137 if self._is_ci_environment():
138 return False
140 # Check stored consent
141 return self._check_stored_consent()
143 def _is_ci_environment(self) -> bool:
144 """Detect if running in a CI environment."""
145 ci_env_vars = [
146 "CI",
147 "CONTINUOUS_INTEGRATION",
148 "GITHUB_ACTIONS",
149 "GITLAB_CI",
150 "JENKINS_URL",
151 "TRAVIS",
152 ]
153 return any(os.getenv(var) for var in ci_env_vars)
155 def _check_stored_consent(self) -> bool:
156 """Check if user has previously consented to telemetry."""
157 if not self._consent_file.exists():
158 return False
160 try:
161 with self._consent_file.open() as f:
162 consent_data = json.load(f)
163 return bool(consent_data.get("telemetry_enabled", False))
164 except (OSError, json.JSONDecodeError, KeyError):
165 return False
167 def _get_anonymous_id(self) -> str:
168 """Get or create anonymous user ID."""
169 if self._anonymous_id:
170 return self._anonymous_id
172 consent_data = {}
173 if self._consent_file.exists():
174 try:
175 with self._consent_file.open() as f:
176 consent_data = json.load(f)
177 except (OSError, json.JSONDecodeError):
178 pass
180 # Use existing ID or create new one
181 self._anonymous_id = consent_data.get("anonymous_id") or str(uuid.uuid4())
182 return self._anonymous_id
184 def _initialize_sentry(self) -> None:
185 """Initialize Sentry SDK with privacy-conscious settings."""
186 # Use helper to allow test mocking of SENTRY_AVAILABLE
187 if not _is_sentry_available():
188 return
190 # Get DSN from environment
191 dsn = os.getenv("SENTRY_DSN")
192 if not dsn: 192 ↛ 193line 192 didn't jump to line 193 because the condition on line 192 was never true
193 return
195 try:
196 # Get package name dynamically from module
197 package_name = __package__.split(".")[0].replace("_", "-")
199 # Try to get version, fallback to 'unknown' for test environments
200 try:
201 pkg_version = version(package_name)
202 release = f"{package_name}@{pkg_version}"
203 except Exception:
204 # In test environments, the package might not be installed
205 release = f"{package_name}@unknown"
207 sentry_sdk.init(
208 dsn=dsn,
209 # Scrub sensitive data
210 before_send=self._scrub_sensitive_data,
211 before_send_transaction=self._scrub_sensitive_data,
212 # Sample rate for performance monitoring
213 traces_sample_rate=TELEMETRY_SAMPLE_RATE,
214 # Release tracking
215 release=release,
216 # Environment
217 environment=os.getenv(
218 f"{self._package_name.replace('-', '_').upper()}_ENVIRONMENT", "production"
219 ),
220 # Disable automatic user tracking
221 send_default_pii=False,
222 # Integrations
223 integrations=[
224 LoggingIntegration(
225 level=None, # Don't capture logs
226 event_level=None,
227 ),
228 ],
229 # Don't attach stack locals (might contain sensitive data)
230 attach_stacktrace=False,
231 )
233 # Set anonymous user context
234 sentry_sdk.set_user({"id": self._get_anonymous_id()})
236 # Set global tags
237 sentry_sdk.set_tag("python_version", platform.python_version())
238 sentry_sdk.set_tag("platform", platform.system())
239 sentry_sdk.set_tag("platform_version", platform.version())
241 self._initialized = True
242 self._enabled = True
244 except Exception:
245 # Silently fail if Sentry initialization fails
246 logger.debug("Failed to initialize Sentry", exc_info=True)
248 def _scrub_sensitive_data(
249 self,
250 event: Event,
251 hint: Hint, # noqa: ARG002
252 ) -> Optional[Event]:
253 """Scrub sensitive data from Sentry events."""
254 if not event: 254 ↛ 255line 254 didn't jump to line 255 because the condition on line 254 was never true
255 return None
257 # Work directly with event as it's dict-like
258 # Scrub file paths in stack traces
259 self._scrub_stacktraces(event)
261 # Scrub breadcrumbs
262 self._scrub_breadcrumbs(event)
264 # Scrub extra context
265 if "extra" in event and isinstance(event["extra"], dict):
266 event["extra"] = self._scrub_dict(event["extra"])
268 # Scrub user data (though we shouldn't have any)
269 if "user" in event:
270 event["user"] = {"id": self._get_anonymous_id()}
272 # Remove request data and server name
273 event.pop("request", None)
274 event.pop("server_name", None)
276 return event
278 def _scrub_stacktraces(self, event_dict: Event) -> None:
279 """Scrub sensitive data from stack traces."""
280 if "exception" not in event_dict:
281 return
283 exceptions = event_dict.get("exception", {})
284 if not isinstance(exceptions, dict) or "values" not in exceptions:
285 return
287 for exception in exceptions.get("values", []):
288 stacktrace = exception.get("stacktrace", {})
289 if isinstance(stacktrace, dict) and "frames" in stacktrace: 289 ↛ 287line 289 didn't jump to line 287 because the condition on line 289 was always true
290 for frame in stacktrace.get("frames", []):
291 if "filename" in frame: 291 ↛ 293line 291 didn't jump to line 293 because the condition on line 291 was always true
292 frame["filename"] = self._anonymize_path(frame["filename"])
293 frame.pop("vars", None)
295 def _scrub_breadcrumbs(self, event_dict: Event) -> None:
296 """Scrub sensitive data from breadcrumbs."""
297 if "breadcrumbs" not in event_dict:
298 return
300 breadcrumbs = event_dict.get("breadcrumbs", {})
301 if isinstance(breadcrumbs, dict) and "values" in breadcrumbs:
302 for crumb in breadcrumbs.get("values", []):
303 if "data" in crumb and isinstance(crumb["data"], dict):
304 crumb["data"] = self._scrub_dict(crumb["data"])
306 def _anonymize_path(self, filepath: str) -> str:
307 """Anonymize file paths to remove user-specific information."""
308 # Convert to Path for easier manipulation
309 path = Path(filepath)
311 # If it's within site-packages, show relative path
312 for part in path.parts:
313 if "site-packages" in part:
314 idx = path.parts.index(part)
315 return str(Path(*path.parts[idx + 1 :]))
317 # If it contains the package module, show path from there
318 module_name = self._package_name.replace("-", "_")
319 for i, part in enumerate(path.parts):
320 if part == module_name:
321 return str(Path(*path.parts[i:]))
323 # Otherwise just show the filename
324 return path.name
326 def _scrub_dict(self, data: dict[str, Any]) -> dict[str, Any]:
327 """Recursively scrub sensitive information from dictionaries."""
328 scrubbed: dict[str, Any] = {}
329 sensitive_keys = {
330 "password",
331 "token",
332 "key",
333 "secret",
334 "credential",
335 "auth",
336 "api_key",
337 "access_token",
338 "private",
339 "path",
340 "file",
341 "dir",
342 "directory",
343 "home",
344 "user",
345 "username",
346 "email",
347 "host",
348 "hostname",
349 "ip",
350 "address",
351 }
353 for key, value in data.items():
354 # Check if key contains sensitive terms
355 if any(term in key.lower() for term in sensitive_keys):
356 scrubbed[key] = "[REDACTED]"
357 elif isinstance(value, dict):
358 scrubbed[key] = self._scrub_dict(value)
359 elif isinstance(value, str):
360 # Scrub file paths
361 if "/" in value or "\\" in value: 361 ↛ 362line 361 didn't jump to line 362 because the condition on line 361 was never true
362 scrubbed[key] = "[PATH]"
363 else:
364 scrubbed[key] = value
365 else:
366 scrubbed[key] = value
368 return scrubbed
370 @property
371 def enabled(self) -> bool:
372 """Check if telemetry is enabled."""
373 return self._enabled
375 def set_consent(self, enabled: bool, prompt_shown: bool = True) -> None:
376 """Set telemetry consent and persist it."""
377 # Create config directory if it doesn't exist
378 self._consent_file.parent.mkdir(parents=True, exist_ok=True)
380 # Load existing data or create new
381 consent_data = {}
382 if self._consent_file.exists():
383 try:
384 with self._consent_file.open() as f:
385 consent_data = json.load(f)
386 except (OSError, json.JSONDecodeError):
387 pass
389 # Update consent data
390 consent_data.update(
391 {
392 "telemetry_enabled": enabled,
393 "anonymous_id": self._get_anonymous_id(),
394 "consent_timestamp": datetime.now(timezone.utc).isoformat(),
395 "prompt_shown": prompt_shown,
396 }
397 )
399 # Write consent file
400 with self._consent_file.open("w") as f:
401 json.dump(consent_data, f, indent=JSON_INDENT)
403 # Update internal state
404 self._enabled = enabled
405 if enabled and not self._initialized:
406 self._initialize_sentry()
407 elif not enabled and self._initialized: 407 ↛ 409line 407 didn't jump to line 409 because the condition on line 407 was never true
408 # Disable Sentry
409 if _is_sentry_available():
410 sentry_sdk.init(dsn=None)
411 self._initialized = False
413 def track_command(
414 self, command: str, success: bool = True, duration: Optional[float] = None
415 ) -> None:
416 """Track CLI command execution."""
417 if not self._enabled or not _is_sentry_available():
418 return
420 try:
421 # Track as custom event
422 sentry_sdk.capture_message(
423 f"Command executed: {command}",
424 level="info",
425 extras={
426 "command": command,
427 "success": success,
428 "duration": duration,
429 },
430 )
432 # Also track as metric
433 tags = {
434 "command": command,
435 "success": str(success),
436 }
438 # Increment counter
439 sentry_sdk.metrics.incr(
440 key="cli.command.executed",
441 value=1,
442 tags=tags,
443 )
445 # Track duration if provided
446 if duration is not None:
447 sentry_sdk.metrics.distribution(
448 key="cli.command.duration",
449 value=duration,
450 unit="second",
451 tags=tags,
452 )
454 except Exception:
455 # Never let telemetry break the application
456 logger.debug("Failed to track telemetry", exc_info=True)
458 def track_error(self, error: Exception, context: Optional[dict[str, Any]] = None) -> None:
459 """Track errors with context."""
460 if not self._enabled or not _is_sentry_available():
461 return
463 try:
464 # Add context if provided
465 if context:
466 scrubbed_context = self._scrub_dict(context)
467 with sentry_sdk.push_scope() as scope:
468 for key, value in scrubbed_context.items():
469 scope.set_extra(key, value)
470 sentry_sdk.capture_exception(error)
471 else:
472 sentry_sdk.capture_exception(error)
474 except Exception:
475 # Never let telemetry break the application
476 logger.debug("Failed to track telemetry", exc_info=True)
478 def track_metric(
479 self,
480 name: str,
481 value: float,
482 unit: Optional[str] = None,
483 tags: Optional[dict[str, str]] = None,
484 ) -> None:
485 """Track custom metrics."""
486 if not self._enabled or not _is_sentry_available():
487 return
489 try:
490 if unit is not None:
491 sentry_sdk.metrics.distribution(
492 key=f"custom.{name}",
493 value=value,
494 unit=unit,
495 tags=tags or {},
496 )
497 else:
498 sentry_sdk.metrics.distribution(
499 key=f"custom.{name}",
500 value=value,
501 tags=tags or {},
502 )
503 except Exception:
504 # Never let telemetry break the application
505 logger.debug("Failed to track telemetry", exc_info=True)
507 def flush(self, timeout: float = DEFAULT_TELEMETRY_FLUSH_TIMEOUT) -> None:
508 """Flush pending telemetry events."""
509 if self._enabled and _is_sentry_available():
510 with contextlib.suppress(Exception):
511 sentry_sdk.flush(timeout=timeout)
514# Global telemetry instance holder
515class _TelemetryClientHolder:
516 client: Optional[TelemetryClient] = None
519# Test helper to set SENTRY_AVAILABLE
520def _set_sentry_available(value: bool) -> None:
521 """Set SENTRY_AVAILABLE for testing. Not part of public API."""
522 globals()["SENTRY_AVAILABLE"] = value
525# Helper to check SENTRY_AVAILABLE from within module
526def _is_sentry_available() -> bool:
527 """Check if sentry is available. Used internally to allow test mocking."""
528 return bool(globals().get("SENTRY_AVAILABLE", False))
531def get_telemetry_client() -> TelemetryClient:
532 """Get or create the global telemetry client."""
533 if _TelemetryClientHolder.client is None:
534 _TelemetryClientHolder.client = TelemetryClient()
535 return _TelemetryClientHolder.client
538def track_command_execution(func: Callable[..., Any]) -> Callable[..., Any]:
539 """Decorator to automatically track command execution."""
541 @wraps(func)
542 def wrapper(*args: Any, **kwargs: Any) -> Any:
543 client = get_telemetry_client()
544 command_name = func.__name__
545 start_time = datetime.now(timezone.utc)
547 try:
548 result = func(*args, **kwargs)
549 except Exception as e:
550 # Track failed execution
551 duration = (datetime.now(timezone.utc) - start_time).total_seconds()
552 client.track_command(command_name, success=False, duration=duration)
553 client.track_error(e, {"command": command_name})
554 raise
555 else:
556 # Track successful execution
557 duration = (datetime.now(timezone.utc) - start_time).total_seconds()
558 client.track_command(command_name, success=True, duration=duration)
559 return result
561 return wrapper