|
| 1 | +""" |
| 2 | +Example: Live Transcription via SageMaker (Listen V1) |
| 3 | +
|
| 4 | +This example shows how to use a custom transport to stream audio for real-time |
| 5 | +transcription through a SageMaker endpoint running Deepgram, instead of the |
| 6 | +Deepgram Cloud WebSocket API. |
| 7 | +
|
| 8 | +The SageMaker transport uses HTTP/2 bidirectional streaming under the hood, |
| 9 | +but exposes the same SDK interface — just swap in a transport_factory. |
| 10 | +
|
| 11 | +**Async-only** — the SageMaker transport requires ``AsyncDeepgramClient``. |
| 12 | +It cannot be used with the sync ``DeepgramClient``. |
| 13 | +
|
| 14 | +Requirements:: |
| 15 | +
|
| 16 | + pip install sagemaker-runtime-http2 boto3 |
| 17 | +
|
| 18 | +Environment: |
| 19 | + AWS credentials must be configured (via environment variables, |
| 20 | + ``~/.aws/credentials``, or an IAM role). |
| 21 | + Set ``SAGEMAKER_ENDPOINT_NAME`` and ``AWS_REGION`` in ``.env`` or your shell. |
| 22 | +""" |
| 23 | + |
| 24 | +import asyncio |
| 25 | +import os |
| 26 | +from typing import Union |
| 27 | + |
| 28 | +from dotenv import load_dotenv |
| 29 | + |
| 30 | +load_dotenv() |
| 31 | + |
| 32 | +from deepgram import AsyncDeepgramClient |
| 33 | +from deepgram.core.events import EventType |
| 34 | +from deepgram.listen.v1.types import ( |
| 35 | + ListenV1Metadata, |
| 36 | + ListenV1Results, |
| 37 | + ListenV1SpeechStarted, |
| 38 | + ListenV1UtteranceEnd, |
| 39 | +) |
| 40 | +from deepgram.transports.sagemaker import SageMakerTransportFactory |
| 41 | + |
| 42 | +ListenV1SocketClientResponse = Union[ListenV1Results, ListenV1Metadata, ListenV1UtteranceEnd, ListenV1SpeechStarted] |
| 43 | + |
| 44 | +# --------------------------------------------------------------------------- |
| 45 | +# Configuration |
| 46 | +# --------------------------------------------------------------------------- |
| 47 | +SAGEMAKER_ENDPOINT = os.getenv("SAGEMAKER_ENDPOINT_NAME", "deepgram-nova-3") |
| 48 | +SAGEMAKER_REGION = os.getenv("AWS_REGION", "us-west-2") |
| 49 | + |
| 50 | +CHUNK_SIZE = 512_000 # 512 KB per chunk (optimal for streaming performance) |
| 51 | +CHUNK_DELAY = 0.5 # seconds between chunks |
| 52 | + |
| 53 | +# --------------------------------------------------------------------------- |
| 54 | +# Create the client with SageMaker transport |
| 55 | +# --------------------------------------------------------------------------- |
| 56 | +factory = SageMakerTransportFactory( |
| 57 | + endpoint_name=SAGEMAKER_ENDPOINT, |
| 58 | + region=SAGEMAKER_REGION, |
| 59 | +) |
| 60 | + |
| 61 | +# SageMaker uses AWS credentials (not Deepgram API keys), so api_key is unused |
| 62 | +client = AsyncDeepgramClient(api_key="unused", transport_factory=factory) |
| 63 | + |
| 64 | + |
| 65 | +async def main() -> None: |
| 66 | + try: |
| 67 | + async with client.listen.v1.connect(model="nova-3") as connection: |
| 68 | + |
| 69 | + def on_message(message: ListenV1SocketClientResponse) -> None: |
| 70 | + msg_type = getattr(message, "type", "Unknown") |
| 71 | + print(f"Received {msg_type} event") |
| 72 | + |
| 73 | + # Extract transcription from Results events |
| 74 | + if isinstance(message, ListenV1Results): |
| 75 | + if message.channel and message.channel.alternatives: |
| 76 | + transcript = message.channel.alternatives[0].transcript |
| 77 | + if transcript: |
| 78 | + print(f"Transcript: {transcript}") |
| 79 | + |
| 80 | + connection.on(EventType.OPEN, lambda _: print("Connection opened")) |
| 81 | + connection.on(EventType.MESSAGE, on_message) |
| 82 | + connection.on(EventType.CLOSE, lambda _: print("Connection closed")) |
| 83 | + connection.on(EventType.ERROR, lambda error: print(f"Error: {error}")) |
| 84 | + |
| 85 | + # Start listening in a background task so we can send audio concurrently |
| 86 | + listen_task = asyncio.create_task(connection.start_listening()) |
| 87 | + |
| 88 | + # Wait for the connection to establish |
| 89 | + await asyncio.sleep(1) |
| 90 | + |
| 91 | + # Read and send audio in chunks |
| 92 | + audio_path = os.path.join(os.path.dirname(__file__), "fixtures", "audio.wav") |
| 93 | + with open(audio_path, "rb") as audio_file: |
| 94 | + audio_data = audio_file.read() |
| 95 | + |
| 96 | + print(f"Sending {len(audio_data)} bytes in {CHUNK_SIZE}-byte chunks...") |
| 97 | + for i in range(0, len(audio_data), CHUNK_SIZE): |
| 98 | + chunk = audio_data[i : i + CHUNK_SIZE] |
| 99 | + await connection.send_media(chunk) |
| 100 | + print(f"Sent chunk {i // CHUNK_SIZE + 1} ({len(chunk)} bytes)") |
| 101 | + await asyncio.sleep(CHUNK_DELAY) |
| 102 | + |
| 103 | + # Signal end of audio |
| 104 | + await connection.send_finalize() |
| 105 | + print("Finished sending audio") |
| 106 | + |
| 107 | + # Wait for final responses |
| 108 | + await asyncio.sleep(5) |
| 109 | + |
| 110 | + # Cancel the listening task |
| 111 | + listen_task.cancel() |
| 112 | + |
| 113 | + except Exception as e: |
| 114 | + print(f"Error: {e}") |
| 115 | + |
| 116 | + |
| 117 | +asyncio.run(main()) |
0 commit comments