diff --git a/src/deepgram/extensions/core/instrumented_http.py b/src/deepgram/extensions/core/instrumented_http.py index 214683ac..418e5bbd 100644 --- a/src/deepgram/extensions/core/instrumented_http.py +++ b/src/deepgram/extensions/core/instrumented_http.py @@ -140,33 +140,47 @@ def request( force_multipart=force_multipart, ) duration_ms = (time.perf_counter() - start) * 1000.0 - try: - if self._events is not None: - response_headers = typing.cast(typing.Union[typing.Mapping[str, str], None], getattr(resp, "headers", None)) - # Filter response headers for telemetry extras - try: - from .telemetry_events import ( - capture_response_details, - # filter_sensitive_headers, # No longer needed - using privacy-focused capture - ) - # No longer filter response headers - use privacy-focused response_details instead - extras = None - response_details = capture_response_details(resp) - except Exception: - extras = None - response_details = None + + # Capture response details in a background thread to avoid blocking + # We capture minimal sync data here and defer detailed processing + if self._events is not None: + try: + import threading - self._events.on_http_response( - method=method, - url=url or "", - status_code=resp.status_code, - duration_ms=duration_ms, - headers=response_headers, - extras=extras, - response_details=response_details, - ) - except Exception: - pass + def capture_and_emit(): + try: + response_headers = typing.cast(typing.Union[typing.Mapping[str, str], None], getattr(resp, "headers", None)) + # Filter response headers for telemetry extras + try: + from .telemetry_events import ( + capture_response_details, + # filter_sensitive_headers, # No longer needed - using privacy-focused capture + ) + # No longer filter response headers - use privacy-focused response_details instead + extras = None + response_details = capture_response_details(resp) + except Exception: + extras = None + response_details = None + + self._events.on_http_response( + method=method, + url=url or "", + status_code=resp.status_code, + duration_ms=duration_ms, + headers=response_headers, + extras=extras, + response_details=response_details, + ) + except Exception: + pass + + # Emit telemetry in background thread so we don't block the response + thread = threading.Thread(target=capture_and_emit, daemon=True, name="dg-telemetry-capture") + thread.start() + except Exception: + pass + return resp except Exception as exc: duration_ms = (time.perf_counter() - start) * 1000.0 @@ -311,33 +325,47 @@ async def request( force_multipart=force_multipart, ) duration_ms = (time.perf_counter() - start) * 1000.0 - try: - if self._events is not None: - response_headers = typing.cast(typing.Union[typing.Mapping[str, str], None], getattr(resp, "headers", None)) - # Filter response headers for telemetry extras - try: - from .telemetry_events import ( - capture_response_details, - # filter_sensitive_headers, # No longer needed - using privacy-focused capture - ) - # No longer filter response headers - use privacy-focused response_details instead - extras = None - response_details = capture_response_details(resp) - except Exception: - extras = None - response_details = None + + # Capture response details in a background thread to avoid blocking + # We capture minimal sync data here and defer detailed processing + if self._events is not None: + try: + import threading - self._events.on_http_response( - method=method, - url=url or "", - status_code=resp.status_code, - duration_ms=duration_ms, - headers=response_headers, - extras=extras, - response_details=response_details, - ) - except Exception: - pass + def capture_and_emit(): + try: + response_headers = typing.cast(typing.Union[typing.Mapping[str, str], None], getattr(resp, "headers", None)) + # Filter response headers for telemetry extras + try: + from .telemetry_events import ( + capture_response_details, + # filter_sensitive_headers, # No longer needed - using privacy-focused capture + ) + # No longer filter response headers - use privacy-focused response_details instead + extras = None + response_details = capture_response_details(resp) + except Exception: + extras = None + response_details = None + + self._events.on_http_response( + method=method, + url=url or "", + status_code=resp.status_code, + duration_ms=duration_ms, + headers=response_headers, + extras=extras, + response_details=response_details, + ) + except Exception: + pass + + # Emit telemetry in background thread so we don't block the response + thread = threading.Thread(target=capture_and_emit, daemon=True, name="dg-telemetry-capture") + thread.start() + except Exception: + pass + return resp except Exception as exc: duration_ms = (time.perf_counter() - start) * 1000.0 diff --git a/src/deepgram/extensions/core/instrumented_socket.py b/src/deepgram/extensions/core/instrumented_socket.py index 128bec9d..42da3309 100644 --- a/src/deepgram/extensions/core/instrumented_socket.py +++ b/src/deepgram/extensions/core/instrumented_socket.py @@ -107,43 +107,65 @@ def instrumented_connect(uri, *args, additional_headers: Union[typing.Dict[str, connection_kwargs=kwargs, ) - # Emit connect event - if events: - try: - events.on_ws_connect( - url=str(uri), - headers=additional_headers, - request_details=request_details, - ) - except Exception: - pass - try: - # Call original connect + # Call original connect first - don't block on telemetry connection = original_connect(uri, *args, additional_headers=additional_headers, **kwargs) + # Emit connect event in background after connection is established + if events: + try: + import threading + + def emit_connect(): + try: + events.on_ws_connect( + url=str(uri), + headers=additional_headers, + request_details=request_details, + ) + except Exception: + pass + + thread = threading.Thread(target=emit_connect, daemon=True, name="dg-ws-telemetry") + thread.start() + except Exception: + pass + # Wrap the connection to capture close event if events: original_close = connection.close def instrumented_close(*close_args, **close_kwargs): - duration_ms = (time.perf_counter() - start_time) * 1000 - response_details = _capture_response_details( - status_code=1000, # Normal close - duration_ms=duration_ms - ) + # Close the connection first + result = original_close(*close_args, **close_kwargs) + # Then emit telemetry in background try: - events.on_ws_close( - url=str(uri), - duration_ms=duration_ms, - request_details=request_details, - response_details=response_details, - ) + import threading + + def emit_close(): + try: + duration_ms = (time.perf_counter() - start_time) * 1000 + response_details = _capture_response_details( + status_code=1000, # Normal close + duration_ms=duration_ms + ) + + events.on_ws_close( + url=str(uri), + duration_ms=duration_ms, + request_details=request_details, + response_details=response_details, + ) + except Exception: + pass + + thread = threading.Thread(target=emit_close, daemon=True, name="dg-ws-telemetry") + thread.start() except Exception: pass - return original_close(*close_args, **close_kwargs) + return result connection.close = instrumented_close @@ -154,64 +176,75 @@ def instrumented_close(*close_args, **close_kwargs): duration_ms = (time.perf_counter() - start_time) * 1000 - # Capture detailed error information - response_details = _capture_response_details( - error=error, - duration_ms=duration_ms, - error_type=type(error).__name__, - error_message=str(error), - stack_trace=traceback.format_exc(), - function_name="websockets.sync.client.connect", - timeout_occurred="timeout" in str(error).lower() or "timed out" in str(error).lower(), - ) - - # Capture WebSocket handshake response headers if available - try: - # Handle InvalidStatusCode exceptions (handshake failures) - if error.__class__.__name__ == 'InvalidStatusCode': - # Status code is directly available - if hasattr(error, 'status_code'): - response_details["handshake_status_code"] = error.status_code - - # Headers are directly available as e.headers - if hasattr(error, 'headers') and error.headers: - response_details["handshake_response_headers"] = dict(error.headers) - - # Some versions might have response_headers - elif hasattr(error, 'response_headers') and error.response_headers: - response_details["handshake_response_headers"] = dict(error.response_headers) - - # Handle InvalidHandshake exceptions (protocol-level failures) - elif error.__class__.__name__ == 'InvalidHandshake': - response_details["handshake_error_type"] = "InvalidHandshake" - if hasattr(error, 'headers') and error.headers: - response_details["handshake_response_headers"] = dict(error.headers) - - # Generic fallback for any exception with headers - elif hasattr(error, 'headers') and error.headers: - response_details["handshake_response_headers"] = dict(error.headers) - elif hasattr(error, 'response_headers') and error.response_headers: - response_details["handshake_response_headers"] = dict(error.response_headers) - - # Capture status code if available (for any exception type) - if hasattr(error, 'status_code') and not response_details.get("handshake_status_code"): - response_details["handshake_status_code"] = error.status_code - - except Exception: - # Don't let header extraction fail the error handling - pass - + # Emit error telemetry in background, then re-raise immediately if events: try: - events.on_ws_error( - url=str(uri), - error=error, - duration_ms=duration_ms, - request_details=request_details, - response_details=response_details, - ) + import threading + + def emit_error(): + try: + # Capture detailed error information + response_details = _capture_response_details( + error=error, + duration_ms=duration_ms, + error_type=type(error).__name__, + error_message=str(error), + stack_trace=traceback.format_exc(), + function_name="websockets.sync.client.connect", + timeout_occurred="timeout" in str(error).lower() or "timed out" in str(error).lower(), + ) + + # Capture WebSocket handshake response headers if available + try: + # Handle InvalidStatusCode exceptions (handshake failures) + if error.__class__.__name__ == 'InvalidStatusCode': + # Status code is directly available + if hasattr(error, 'status_code'): + response_details["handshake_status_code"] = error.status_code + + # Headers are directly available as e.headers + if hasattr(error, 'headers') and error.headers: + response_details["handshake_response_headers"] = dict(error.headers) + + # Some versions might have response_headers + elif hasattr(error, 'response_headers') and error.response_headers: + response_details["handshake_response_headers"] = dict(error.response_headers) + + # Handle InvalidHandshake exceptions (protocol-level failures) + elif error.__class__.__name__ == 'InvalidHandshake': + response_details["handshake_error_type"] = "InvalidHandshake" + if hasattr(error, 'headers') and error.headers: + response_details["handshake_response_headers"] = dict(error.headers) + + # Generic fallback for any exception with headers + elif hasattr(error, 'headers') and error.headers: + response_details["handshake_response_headers"] = dict(error.headers) + elif hasattr(error, 'response_headers') and error.response_headers: + response_details["handshake_response_headers"] = dict(error.response_headers) + + # Capture status code if available (for any exception type) + if hasattr(error, 'status_code') and not response_details.get("handshake_status_code"): + response_details["handshake_status_code"] = error.status_code + + except Exception: + # Don't let header extraction fail the error handling + pass + + events.on_ws_error( + url=str(uri), + error=error, + duration_ms=duration_ms, + request_details=request_details, + response_details=response_details, + ) + except Exception: + pass + + thread = threading.Thread(target=emit_error, daemon=True, name="dg-ws-telemetry") + thread.start() except Exception: pass + raise return instrumented_connect @@ -234,45 +267,67 @@ def instrumented_connect(uri, *args, extra_headers: Union[typing.Dict[str, str], connection_kwargs=kwargs, ) - # Emit connect event - if events: - try: - events.on_ws_connect( - url=str(uri), - headers=extra_headers, - request_details=request_details, - ) - except Exception: - pass - # Return an async context manager @asynccontextmanager async def instrumented_context(): try: - # Call original connect + # Call original connect - don't block on telemetry async with original_connect(uri, *args, extra_headers=extra_headers, **kwargs) as connection: + # Emit connect event in background after connection is established + if events: + try: + import threading + + def emit_connect(): + try: + events.on_ws_connect( + url=str(uri), + headers=extra_headers, + request_details=request_details, + ) + except Exception: + pass + + thread = threading.Thread(target=emit_connect, daemon=True, name="dg-ws-telemetry") + thread.start() + except Exception: + pass + # Wrap the connection to capture close event if events: original_close = connection.close async def instrumented_close(*close_args, **close_kwargs): - duration_ms = (time.perf_counter() - start_time) * 1000 - response_details = _capture_response_details( - status_code=1000, # Normal close - duration_ms=duration_ms - ) + # Close the connection first + result = await original_close(*close_args, **close_kwargs) + # Then emit telemetry in background try: - events.on_ws_close( - url=str(uri), - duration_ms=duration_ms, - request_details=request_details, - response_details=response_details, - ) + import threading + + def emit_close(): + try: + duration_ms = (time.perf_counter() - start_time) * 1000 + response_details = _capture_response_details( + status_code=1000, # Normal close + duration_ms=duration_ms + ) + + events.on_ws_close( + url=str(uri), + duration_ms=duration_ms, + request_details=request_details, + response_details=response_details, + ) + except Exception: + pass + + thread = threading.Thread(target=emit_close, daemon=True, name="dg-ws-telemetry") + thread.start() except Exception: pass - return await original_close(*close_args, **close_kwargs) + return result connection.close = instrumented_close @@ -281,17 +336,26 @@ async def instrumented_close(*close_args, **close_kwargs): # Also emit close event when context exits (if connection wasn't manually closed) if events: try: - duration_ms = (time.perf_counter() - start_time) * 1000 - response_details = _capture_response_details( - status_code=1000, # Normal close - duration_ms=duration_ms - ) - events.on_ws_close( - url=str(uri), - duration_ms=duration_ms, - request_details=request_details, - response_details=response_details, - ) + import threading + + def emit_close(): + try: + duration_ms = (time.perf_counter() - start_time) * 1000 + response_details = _capture_response_details( + status_code=1000, # Normal close + duration_ms=duration_ms + ) + events.on_ws_close( + url=str(uri), + duration_ms=duration_ms, + request_details=request_details, + response_details=response_details, + ) + except Exception: + pass + + thread = threading.Thread(target=emit_close, daemon=True, name="dg-ws-telemetry") + thread.start() except Exception: pass @@ -300,64 +364,75 @@ async def instrumented_close(*close_args, **close_kwargs): duration_ms = (time.perf_counter() - start_time) * 1000 - # Capture detailed error information - response_details = _capture_response_details( - error=error, - duration_ms=duration_ms, - error_type=type(error).__name__, - error_message=str(error), - stack_trace=traceback.format_exc(), - function_name="websockets.client.connect", - timeout_occurred="timeout" in str(error).lower() or "timed out" in str(error).lower(), - ) - - # Capture WebSocket handshake response headers if available - try: - # Handle InvalidStatusCode exceptions (handshake failures) - if error.__class__.__name__ == 'InvalidStatusCode': - # Status code is directly available - if hasattr(error, 'status_code'): - response_details["handshake_status_code"] = error.status_code - - # Headers are directly available as e.headers - if hasattr(error, 'headers') and error.headers: - response_details["handshake_response_headers"] = dict(error.headers) - - # Some versions might have response_headers - elif hasattr(error, 'response_headers') and error.response_headers: - response_details["handshake_response_headers"] = dict(error.response_headers) - - # Handle InvalidHandshake exceptions (protocol-level failures) - elif error.__class__.__name__ == 'InvalidHandshake': - response_details["handshake_error_type"] = "InvalidHandshake" - if hasattr(error, 'headers') and error.headers: - response_details["handshake_response_headers"] = dict(error.headers) - - # Generic fallback for any exception with headers - elif hasattr(error, 'headers') and error.headers: - response_details["handshake_response_headers"] = dict(error.headers) - elif hasattr(error, 'response_headers') and error.response_headers: - response_details["handshake_response_headers"] = dict(error.response_headers) - - # Capture status code if available (for any exception type) - if hasattr(error, 'status_code') and not response_details.get("handshake_status_code"): - response_details["handshake_status_code"] = error.status_code - - except Exception: - # Don't let header extraction fail the error handling - pass - + # Emit error telemetry in background, then re-raise immediately if events: try: - events.on_ws_error( - url=str(uri), - error=error, - duration_ms=duration_ms, - request_details=request_details, - response_details=response_details, - ) + import threading + + def emit_error(): + try: + # Capture detailed error information + response_details = _capture_response_details( + error=error, + duration_ms=duration_ms, + error_type=type(error).__name__, + error_message=str(error), + stack_trace=traceback.format_exc(), + function_name="websockets.client.connect", + timeout_occurred="timeout" in str(error).lower() or "timed out" in str(error).lower(), + ) + + # Capture WebSocket handshake response headers if available + try: + # Handle InvalidStatusCode exceptions (handshake failures) + if error.__class__.__name__ == 'InvalidStatusCode': + # Status code is directly available + if hasattr(error, 'status_code'): + response_details["handshake_status_code"] = error.status_code + + # Headers are directly available as e.headers + if hasattr(error, 'headers') and error.headers: + response_details["handshake_response_headers"] = dict(error.headers) + + # Some versions might have response_headers + elif hasattr(error, 'response_headers') and error.response_headers: + response_details["handshake_response_headers"] = dict(error.response_headers) + + # Handle InvalidHandshake exceptions (protocol-level failures) + elif error.__class__.__name__ == 'InvalidHandshake': + response_details["handshake_error_type"] = "InvalidHandshake" + if hasattr(error, 'headers') and error.headers: + response_details["handshake_response_headers"] = dict(error.headers) + + # Generic fallback for any exception with headers + elif hasattr(error, 'headers') and error.headers: + response_details["handshake_response_headers"] = dict(error.headers) + elif hasattr(error, 'response_headers') and error.response_headers: + response_details["handshake_response_headers"] = dict(error.response_headers) + + # Capture status code if available (for any exception type) + if hasattr(error, 'status_code') and not response_details.get("handshake_status_code"): + response_details["handshake_status_code"] = error.status_code + + except Exception: + # Don't let header extraction fail the error handling + pass + + events.on_ws_error( + url=str(uri), + error=error, + duration_ms=duration_ms, + request_details=request_details, + response_details=response_details, + ) + except Exception: + pass + + thread = threading.Thread(target=emit_error, daemon=True, name="dg-ws-telemetry") + thread.start() except Exception: pass + raise return instrumented_context() diff --git a/tests/performance/__init__.py b/tests/performance/__init__.py new file mode 100644 index 00000000..97e2a154 --- /dev/null +++ b/tests/performance/__init__.py @@ -0,0 +1 @@ +"""Performance tests for Deepgram SDK.""" diff --git a/tests/performance/conftest.py b/tests/performance/conftest.py new file mode 100644 index 00000000..b43aebe1 --- /dev/null +++ b/tests/performance/conftest.py @@ -0,0 +1,56 @@ +"""Shared configuration and fixtures for performance tests.""" + +import os + +import pytest + +try: + from dotenv import load_dotenv + + load_dotenv() +except ImportError: + pass + + +def pytest_configure(config): + """Register custom markers for performance tests.""" + config.addinivalue_line("markers", "performance: mark test as a performance test") + config.addinivalue_line("markers", "requires_api_key: mark test as requiring a real API key") + + +@pytest.fixture(scope="session") +def api_key(): + """Get API key from environment, skip test if not available.""" + api_key = os.getenv("TEST_DEEPGRAM_API_KEY") + if not api_key: + pytest.skip("TEST_DEEPGRAM_API_KEY environment variable not set") + return api_key + + +@pytest.fixture(scope="session") +def deepgram_client(api_key): + """Create a DeepgramClient instance for testing.""" + from deepgram import DeepgramClient + + return DeepgramClient(api_key=api_key) + + +@pytest.fixture +def generate_test_text(): + """Factory fixture to generate test text of specified length.""" + + def _generate(length: int = 1000) -> str: + """Generate test text of approximately the specified length.""" + base_text = ( + "The quick brown fox jumps over the lazy dog. " + "This is a test of the text to speech system. " + "We are measuring performance metrics including time to first byte and time to last byte. " + ) + + # Repeat base text to reach desired length + repetitions = (length // len(base_text)) + 1 + text = (base_text * repetitions)[:length] + + return text.strip() + + return _generate diff --git a/tests/performance/test_tts_performance_generate.py b/tests/performance/test_tts_performance_generate.py new file mode 100644 index 00000000..f3b1a4ba --- /dev/null +++ b/tests/performance/test_tts_performance_generate.py @@ -0,0 +1,244 @@ +""" +TTS Performance Test (Non-Streaming): SDK vs Raw httpx + +Compares performance of Deepgram SDK vs raw httpx for Text-to-Speech requests +using the NON-STREAMING approach (reading entire response at once). + +Key Metrics: +- TTFB (Time to First Byte): When first chunk arrives +- TTLB (Time to Last Byte): When all data has been received +- Total bytes received + +Usage: + pytest tests/performance/test_tts_performance_generate.py -v + +Requirements: + - DEEPGRAM_API_KEY environment variable must be set (or in .env file) + - Test will be skipped if API key is not available +""" + +import time +from dataclasses import dataclass + +import httpx +import pytest + + +@dataclass +class Metrics: + """Performance metrics for a TTS request""" + + test_type: str # "SDK" or "Raw httpx" + ttfb_ms: float # Time to First Byte (milliseconds) + ttlb_ms: float # Time to Last Byte (milliseconds) + total_bytes: int # Total bytes received + error: str = None # Error message if any + + +def _test_raw_httpx_generate(api_key: str, text: str, chunk_size: int = 8192) -> Metrics: + """ + Test raw httpx TTS request (non-streaming but reading in chunks to measure TTFB) + + Timing: + - Start timer when request begins + - TTFB: Record when first chunk of response body arrives + - TTLB: Record when all bytes have been read + + Args: + api_key: Deepgram API key + text: Text to convert to speech + chunk_size: Size of chunks to read + + Returns: + Metrics object with timing data + """ + start_time = time.perf_counter() + ttfb_ms = 0.0 + ttlb_ms = 0.0 + total_bytes = 0 + error = None + + try: + with httpx.Client(http2=False) as client: + url = "https://api.deepgram.com/v1/speak" + headers = { + "Authorization": f"Token {api_key}", + "Content-Type": "application/json", + } + json_data = {"text": text} + + # Use streaming to measure TTFB properly, but consume all chunks + with client.stream("POST", url, json=json_data, headers=headers) as response: + response.raise_for_status() + + chunk_count = 0 + for chunk in response.iter_bytes(chunk_size=chunk_size): + current_time = time.perf_counter() + + if chunk_count == 0: + # First chunk received + ttfb_ms = (current_time - start_time) * 1000 + + chunk_count += 1 + total_bytes += len(chunk) + + # All bytes received + ttlb_ms = (time.perf_counter() - start_time) * 1000 + + except Exception as e: + error = str(e) + ttlb_ms = (time.perf_counter() - start_time) * 1000 + + return Metrics( + test_type="Raw httpx (generate)", + ttfb_ms=ttfb_ms, + ttlb_ms=ttlb_ms, + total_bytes=total_bytes, + error=error, + ) + + +def _test_sdk_generate(client, text: str) -> Metrics: + """ + Test SDK TTS request (generate returns iterator - must consume it) + + Timing: + - Start timer when request begins + - TTFB: When first chunk arrives from iterator + - TTLB: When all chunks have been consumed + + Args: + client: DeepgramClient instance + text: Text to convert to speech + + Returns: + Metrics object with timing data + """ + start_time = time.perf_counter() + ttfb_ms = 0.0 + ttlb_ms = 0.0 + total_bytes = 0 + error = None + + try: + # generate() returns an iterator - must consume it to get actual data + response_iterator = client.speak.v1.audio.generate(text=text) + + # Consume the iterator to actually fetch the data + chunk_count = 0 + for chunk in response_iterator: + current_time = time.perf_counter() + + if chunk_count == 0: + # First chunk received + ttfb_ms = (current_time - start_time) * 1000 + + chunk_count += 1 + total_bytes += len(chunk) + + # All chunks received + ttlb_ms = (time.perf_counter() - start_time) * 1000 + + except Exception as e: + error = str(e) + ttlb_ms = (time.perf_counter() - start_time) * 1000 + + return Metrics( + test_type="SDK (generate)", + ttfb_ms=ttfb_ms, + ttlb_ms=ttlb_ms, + total_bytes=total_bytes, + error=error, + ) + + +@pytest.mark.performance +@pytest.mark.requires_api_key +def test_tts_generate_performance(api_key, deepgram_client, generate_test_text): + """ + Compare TTS generate (non-streaming) performance between SDK and raw httpx. + + This test measures and compares: + - Time to First Byte (TTFB) + - Time to Last Byte (TTLB) + - Total bytes received + + The test will be skipped if DEEPGRAM_API_KEY is not set. + """ + # Generate test text + text = generate_test_text(1000) + chunk_size = 8192 + + # Test raw httpx + raw_metrics = _test_raw_httpx_generate(api_key, text, chunk_size) + assert raw_metrics.error is None, f"Raw httpx test failed: {raw_metrics.error}" + assert raw_metrics.total_bytes > 0, "Raw httpx received no data" + + # Test SDK + sdk_metrics = _test_sdk_generate(deepgram_client, text) + assert sdk_metrics.error is None, f"SDK test failed: {sdk_metrics.error}" + assert sdk_metrics.total_bytes > 0, "SDK received no data" + + # Verify both received the same amount of data + assert ( + sdk_metrics.total_bytes == raw_metrics.total_bytes + ), f"Data size mismatch: SDK={sdk_metrics.total_bytes}, Raw={raw_metrics.total_bytes}" + + # Calculate differences + ttfb_diff = sdk_metrics.ttfb_ms - raw_metrics.ttfb_ms + ttlb_diff = sdk_metrics.ttlb_ms - raw_metrics.ttlb_ms + + # Print performance comparison + print("\n" + "=" * 70) + print("TTS GENERATE (NON-STREAMING) PERFORMANCE COMPARISON") + print("=" * 70) + print(f"\nRaw httpx (generate):") + print(f" TTFB: {raw_metrics.ttfb_ms:>8.1f} ms") + print(f" TTLB: {raw_metrics.ttlb_ms:>8.1f} ms") + print(f" Bytes: {raw_metrics.total_bytes:>7,} bytes") + print(f"\nSDK (generate):") + print(f" TTFB: {sdk_metrics.ttfb_ms:>8.1f} ms") + print(f" TTLB: {sdk_metrics.ttlb_ms:>8.1f} ms") + print(f" Bytes: {sdk_metrics.total_bytes:>7,} bytes") + print(f"\nDifferences (SDK - Raw):") + print(f" TTFB: {ttfb_diff:>+8.1f} ms") + print(f" TTLB: {ttlb_diff:>+8.1f} ms") + print("=" * 70) + + # Performance assertions (warnings, not failures) + # We expect SDK to be within reasonable overhead + if ttfb_diff > 100: + print( + f"\n⚠️ Warning: SDK TTFB is {ttfb_diff:.0f}ms slower than raw httpx" + ) + if ttlb_diff > 200: + print( + f"\n⚠️ Warning: SDK TTLB is {ttlb_diff:.0f}ms slower than raw httpx" + ) + + +@pytest.mark.performance +@pytest.mark.requires_api_key +@pytest.mark.parametrize("text_length", [100, 500, 1000, 2000]) +def test_tts_generate_performance_various_lengths( + api_key, deepgram_client, generate_test_text, text_length +): + """ + Test TTS generate (non-streaming) performance with various text lengths. + + This test ensures the SDK performs reasonably across different text sizes. + The test will be skipped if DEEPGRAM_API_KEY is not set. + """ + # Generate test text + text = generate_test_text(text_length) + + # Test SDK + sdk_metrics = _test_sdk_generate(deepgram_client, text) + assert sdk_metrics.error is None, f"SDK test failed: {sdk_metrics.error}" + assert sdk_metrics.total_bytes > 0, "SDK received no data" + + print(f"\n[Text length: {text_length}]") + print(f" TTFB: {sdk_metrics.ttfb_ms:.1f} ms") + print(f" TTLB: {sdk_metrics.ttlb_ms:.1f} ms") + print(f" Bytes: {sdk_metrics.total_bytes:,}") + diff --git a/tests/performance/test_tts_performance_stream.py b/tests/performance/test_tts_performance_stream.py new file mode 100644 index 00000000..0c986c53 --- /dev/null +++ b/tests/performance/test_tts_performance_stream.py @@ -0,0 +1,254 @@ +""" +TTS Performance Test (Streaming): SDK vs Raw httpx + +Compares performance of Deepgram SDK vs raw httpx for Text-to-Speech streaming requests. + +Key Metrics: +- TTFB (Time to First Byte): When first chunk arrives +- TTLB (Time to Last Byte): When all data has been received +- Total bytes received + +Usage: + pytest tests/performance/test_tts_performance_stream.py -v + +Requirements: + - DEEPGRAM_API_KEY environment variable must be set (or in .env file) + - Test will be skipped if API key is not available +""" + +import time +from dataclasses import dataclass + +import httpx +import pytest + + +@dataclass +class Metrics: + """Performance metrics for a TTS request""" + + test_type: str # "SDK" or "Raw httpx" + ttfb_ms: float # Time to First Byte (milliseconds) + ttlb_ms: float # Time to Last Byte (milliseconds) + total_bytes: int # Total bytes received + chunk_count: int # Number of chunks received + error: str = None # Error message if any + + +def _test_raw_httpx(api_key: str, text: str, chunk_size: int = 8192) -> Metrics: + """ + Test raw httpx TTS streaming request + + Timing: + - Start timer when request begins + - TTFB: Record when first chunk arrives from iter_bytes() + - TTLB: Record when iteration completes + + Args: + api_key: Deepgram API key + text: Text to convert to speech + chunk_size: Size of chunks to read + + Returns: + Metrics object with timing data + """ + start_time = time.perf_counter() + ttfb_ms = 0.0 + ttlb_ms = 0.0 + total_bytes = 0 + chunk_count = 0 + error = None + + try: + with httpx.Client(http2=False) as client: + url = "https://api.deepgram.com/v1/speak" + headers = { + "Authorization": f"Token {api_key}", + "Content-Type": "application/json", + } + json_data = {"text": text} + + with client.stream("POST", url, json=json_data, headers=headers) as response: + response.raise_for_status() + + for chunk in response.iter_bytes(chunk_size=chunk_size): + current_time = time.perf_counter() + + if chunk_count == 0: + # First chunk received + ttfb_ms = (current_time - start_time) * 1000 + + chunk_count += 1 + total_bytes += len(chunk) + + # All chunks received + ttlb_ms = (time.perf_counter() - start_time) * 1000 + + except Exception as e: + error = str(e) + ttlb_ms = (time.perf_counter() - start_time) * 1000 + + return Metrics( + test_type="Raw httpx", + ttfb_ms=ttfb_ms, + ttlb_ms=ttlb_ms, + total_bytes=total_bytes, + chunk_count=chunk_count, + error=error, + ) + + +def _test_sdk(client, text: str, chunk_size: int = 8192) -> Metrics: + """ + Test SDK TTS streaming request + + Timing: + - Start timer when request begins + - TTFB: Record when first chunk arrives from http_response.data iterator + - TTLB: Record when iteration completes + + Args: + client: DeepgramClient instance + text: Text to convert to speech + chunk_size: Size of chunks to read + + Returns: + Metrics object with timing data + """ + start_time = time.perf_counter() + ttfb_ms = 0.0 + ttlb_ms = 0.0 + total_bytes = 0 + chunk_count = 0 + error = None + + try: + with client.speak.v1.audio.with_raw_response.generate( + text=text, request_options={"chunk_size": chunk_size} + ) as http_response: + for chunk in http_response.data: + current_time = time.perf_counter() + + if chunk_count == 0: + # First chunk received + ttfb_ms = (current_time - start_time) * 1000 + + chunk_count += 1 + total_bytes += len(chunk) + + # All chunks received + ttlb_ms = (time.perf_counter() - start_time) * 1000 + + except Exception as e: + error = str(e) + ttlb_ms = (time.perf_counter() - start_time) * 1000 + + return Metrics( + test_type="SDK", + ttfb_ms=ttfb_ms, + ttlb_ms=ttlb_ms, + total_bytes=total_bytes, + chunk_count=chunk_count, + error=error, + ) + + +@pytest.mark.performance +@pytest.mark.requires_api_key +def test_tts_streaming_performance(api_key, deepgram_client, generate_test_text): + """ + Compare TTS streaming performance between SDK and raw httpx. + + This test measures and compares: + - Time to First Byte (TTFB) + - Time to Last Byte (TTLB) + - Total bytes received + - Number of chunks + + The test will be skipped if DEEPGRAM_API_KEY is not set. + """ + # Generate test text + text = generate_test_text(1000) + chunk_size = 8192 + + # Test raw httpx + raw_metrics = _test_raw_httpx(api_key, text, chunk_size) + assert raw_metrics.error is None, f"Raw httpx test failed: {raw_metrics.error}" + assert raw_metrics.total_bytes > 0, "Raw httpx received no data" + assert raw_metrics.chunk_count > 0, "Raw httpx received no chunks" + + # Test SDK + sdk_metrics = _test_sdk(deepgram_client, text, chunk_size) + assert sdk_metrics.error is None, f"SDK test failed: {sdk_metrics.error}" + assert sdk_metrics.total_bytes > 0, "SDK received no data" + assert sdk_metrics.chunk_count > 0, "SDK received no chunks" + + # Verify both received the same amount of data + assert ( + sdk_metrics.total_bytes == raw_metrics.total_bytes + ), f"Data size mismatch: SDK={sdk_metrics.total_bytes}, Raw={raw_metrics.total_bytes}" + + # Calculate differences + ttfb_diff = sdk_metrics.ttfb_ms - raw_metrics.ttfb_ms + ttlb_diff = sdk_metrics.ttlb_ms - raw_metrics.ttlb_ms + + # Print performance comparison + print("\n" + "=" * 70) + print("TTS STREAMING PERFORMANCE COMPARISON") + print("=" * 70) + print(f"\nRaw httpx:") + print(f" TTFB: {raw_metrics.ttfb_ms:>8.1f} ms") + print(f" TTLB: {raw_metrics.ttlb_ms:>8.1f} ms") + print( + f" Bytes: {raw_metrics.total_bytes:>7,} bytes in {raw_metrics.chunk_count} chunks" + ) + print(f"\nSDK:") + print(f" TTFB: {sdk_metrics.ttfb_ms:>8.1f} ms") + print(f" TTLB: {sdk_metrics.ttlb_ms:>8.1f} ms") + print( + f" Bytes: {sdk_metrics.total_bytes:>7,} bytes in {sdk_metrics.chunk_count} chunks" + ) + print(f"\nDifferences (SDK - Raw):") + print(f" TTFB: {ttfb_diff:>+8.1f} ms") + print(f" TTLB: {ttlb_diff:>+8.1f} ms") + print("=" * 70) + + # Performance assertions (warnings, not failures) + # We expect SDK to be within reasonable overhead + if ttfb_diff > 100: + print( + f"\n⚠️ Warning: SDK TTFB is {ttfb_diff:.0f}ms slower than raw httpx" + ) + if ttlb_diff > 200: + print( + f"\n⚠️ Warning: SDK TTLB is {ttlb_diff:.0f}ms slower than raw httpx" + ) + + +@pytest.mark.performance +@pytest.mark.requires_api_key +@pytest.mark.parametrize("text_length", [100, 500, 1000, 2000]) +def test_tts_streaming_performance_various_lengths( + api_key, deepgram_client, generate_test_text, text_length +): + """ + Test TTS streaming performance with various text lengths. + + This test ensures the SDK performs reasonably across different text sizes. + The test will be skipped if DEEPGRAM_API_KEY is not set. + """ + # Generate test text + text = generate_test_text(text_length) + chunk_size = 8192 + + # Test SDK + sdk_metrics = _test_sdk(deepgram_client, text, chunk_size) + assert sdk_metrics.error is None, f"SDK test failed: {sdk_metrics.error}" + assert sdk_metrics.total_bytes > 0, "SDK received no data" + assert sdk_metrics.chunk_count > 0, "SDK received no chunks" + + print(f"\n[Text length: {text_length}]") + print(f" TTFB: {sdk_metrics.ttfb_ms:.1f} ms") + print(f" TTLB: {sdk_metrics.ttlb_ms:.1f} ms") + print(f" Bytes: {sdk_metrics.total_bytes:,} in {sdk_metrics.chunk_count} chunks") +