Real-time audio transcription using Parakeet

This examples demonstrates the use of Parakeet ASR models for real-time speech-to-text on Modal.

Parakeet is the name of a family of ASR models built using NVIDIA’s NeMo Framework. We’ll show you how to use Parakeet for real-time audio transcription on Modal GPUs, with simple Python and browser clients.

This example uses the nvidia/parakeet-tdt-0.6b-v2 model which, as of June 2025, sits at the top of Hugging Face’s Open ASR leaderboard.

To try out transcription from your terminal, provide a URL for a .wav file to modal run:

modal run 06_gpu_and_ml/audio-to-text/parakeet.py --audio-url="https://github.com/voxserv/audio_quality_testing_samples/raw/refs/heads/master/mono_44100/156550__acclivity__a-dream-within-a-dream.wav"

You should see output like the following:

🎤 Starting Transcription
A Dream Within A Dream Edgar Allan Poe
take this kiss upon the brow, And in parting from you now, Thus much let me avow You are not wrong who deem That my days have been a dream.
...

Running a web service you can hit from any browser isn’t any harder — Modal handles the deployment of both the frontend and backend in a single App! Just run

modal serve 06_gpu_and_ml/audio-to-text/parakeet.py

and go to the link printed in your terminal.

The full frontend code can be found here.

Setup

import asyncio
import os
import sys
from pathlib import Path

import modal

app = modal.App("example-parakeet")

Volume for caching model weights

We use a Modal Volume to cache the model weights. This allows us to avoid downloading the model weights every time we start a new instance.

For more on storing models on Modal, see this guide.

model_cache = modal.Volume.from_name("parakeet-model-cache", create_if_missing=True)

Configuring dependencies

The model runs remotely inside a container on Modal. We can define the environment and install our Python dependencies in that container’s Image.

For finicky setups like NeMO’s, we recommend using the official NVIDIA CUDA Docker images from Docker Hub. You’ll need to install Python and pip with the add_python option because the image doesn’t have these by default.

Additionally, we install ffmpeg for handling audio data and fastapi to create a web server for our WebSocket.

image = (
    modal.Image.from_registry(
        "nvidia/cuda:12.8.0-cudnn-devel-ubuntu22.04", add_python="3.12"
    )
    .env(
        {
            "HF_HUB_ENABLE_HF_TRANSFER": "1",
            "HF_HOME": "/cache",  # cache directory for Hugging Face models
            "DEBIAN_FRONTEND": "noninteractive",
            "CXX": "g++",
            "CC": "g++",
        }
    )
    .apt_install("ffmpeg")
    .pip_install(
        "hf_transfer==0.1.9",
        "huggingface_hub[hf-xet]==0.31.2",
        "nemo_toolkit[asr]==2.3.0",
        "cuda-python==12.8.0",
        "fastapi==0.115.12",
        "numpy<2",
        "pydub==0.25.1",
    )
    .entrypoint([])  # silence chatty logs by container on start
    .add_local_dir(  # changes fastest, so make this the last layer
        Path(__file__).parent / "frontend",
        remote_path="/frontend",
    )
)

Implementing real-time audio transcription on Modal

Now we’re ready to implement transcription. We wrap inference in a modal.Cls that ensures models are loaded and then moved to the GPU once when a new container starts.

A couples of notes about this code:

  • The transcribe method takes bytes of audio data and returns the transcribed text.
  • The web method creates a FastAPI app using modal.asgi_app that serves a WebSocket endpoint for real-time audio transcription and a browser frontend for transcribing audio from your microphone.
  • The run_with_queue method takes a modal.Queue and passes audio data and transcriptions between our local machine and the GPU container.

Parakeet tries really hard to transcribe everything to English! Hence it tends to output utterances like “Yeah” or “Mm-hmm” when it runs on silent audio. We pre-process the incoming audio in the server using pydub’s silence detection, ensuring that we don’t pass silence into our model.

END_OF_STREAM = (
    b"END_OF_STREAM_8f13d09"  # byte sequence indicating a stream is finished
)


@app.cls(volumes={"/cache": model_cache}, gpu="a10g", image=image)
@modal.concurrent(max_inputs=14, target_inputs=10)
class Parakeet:
    @modal.enter()
    def load(self):
        import logging

        import nemo.collections.asr as nemo_asr

        # silence chatty logs from nemo
        logging.getLogger("nemo_logger").setLevel(logging.CRITICAL)

        self.model = nemo_asr.models.ASRModel.from_pretrained(
            model_name="nvidia/parakeet-tdt-0.6b-v2"
        )

    def transcribe(self, audio_bytes: bytes) -> str:
        import numpy as np

        audio_data = np.frombuffer(audio_bytes, dtype=np.int16).astype(np.float32)

        with NoStdStreams():  # hide output, see https://github.com/NVIDIA/NeMo/discussions/3281#discussioncomment-2251217
            output = self.model.transcribe([audio_data])

        return output[0].text

    @modal.asgi_app()
    def web(self):
        from fastapi import FastAPI, Response, WebSocket
        from fastapi.responses import HTMLResponse
        from fastapi.staticfiles import StaticFiles

        web_app = FastAPI()
        web_app.mount("/static", StaticFiles(directory="/frontend"))

        @web_app.get("/status")
        async def status():
            return Response(status_code=200)

        # serve frontend
        @web_app.get("/")
        async def index():
            return HTMLResponse(content=open("/frontend/index.html").read())

        @web_app.websocket("/ws")
        async def run_with_websocket(ws: WebSocket):
            from fastapi import WebSocketDisconnect
            from pydub import AudioSegment

            await ws.accept()

            # initialize an empty audio segment
            audio_segment = AudioSegment.empty()

            try:
                while True:
                    # receive a chunk of audio data and convert it to an audio segment
                    chunk = await ws.receive_bytes()
                    if chunk == END_OF_STREAM:
                        await ws.send_bytes(END_OF_STREAM)
                        break
                    audio_segment, text = await self.handle_audio_chunk(
                        chunk, audio_segment
                    )
                    if text:
                        await ws.send_text(text)
            except Exception as e:
                if not isinstance(e, WebSocketDisconnect):
                    print(f"Error handling websocket: {type(e)}: {e}")
                try:
                    await ws.close(code=1011, reason="Internal server error")
                except Exception as e:
                    print(f"Error closing websocket: {type(e)}: {e}")

        return web_app

    @modal.method()
    async def run_with_queue(self, q: modal.Queue):
        from pydub import AudioSegment

        # initialize an empty audio segment
        audio_segment = AudioSegment.empty()

        try:
            while True:
                # receive a chunk of audio data and convert it to an audio segment
                chunk = await q.get.aio(partition="audio")

                if chunk == END_OF_STREAM:
                    await q.put.aio(END_OF_STREAM, partition="transcription")
                    break

                audio_segment, text = await self.handle_audio_chunk(
                    chunk, audio_segment
                )
                if text:
                    await q.put.aio(text, partition="transcription")
        except Exception as e:
            print(f"Error handling queue: {type(e)}: {e}")
            return

    async def handle_audio_chunk(
        self,
        chunk: bytes,
        audio_segment,
        silence_thresh=-45,  # dB
        min_silence_len=1000,  # ms
    ):
        from pydub import AudioSegment, silence

        new_audio_segment = AudioSegment(
            data=chunk,
            channels=1,
            sample_width=2,
            frame_rate=TARGET_SAMPLE_RATE,
        )

        # append the new audio segment to the existing audio segment
        audio_segment += new_audio_segment

        # detect windows of silence
        silent_windows = silence.detect_silence(
            audio_segment,
            min_silence_len=min_silence_len,
            silence_thresh=silence_thresh,
        )

        # if there are no silent windows, continue
        if len(silent_windows) == 0:
            return audio_segment, None

        # get the last silent window because
        # we want to transcribe until the final pause
        last_window = silent_windows[-1]

        # if the entire audio segment is silent, reset the audio segment
        if last_window[0] == 0 and last_window[1] == len(audio_segment):
            audio_segment = AudioSegment.empty()
            return audio_segment, None

        # get the segment to transcribe: beginning until last pause
        segment_to_transcribe = audio_segment[: last_window[1]]

        # remove the segment to transcribe from the audio segment
        audio_segment = audio_segment[last_window[1] :]
        try:
            text = self.transcribe(segment_to_transcribe.raw_data)
            return audio_segment, text
        except Exception as e:
            print("❌ Transcription error:", e)
            raise e

Running transcription from a local Python client

Next, let’s test the model with a local_entrypoint that streams audio data to the server and prints out the transcriptions to our terminal as they arrive.

Instead of using the WebSocket endpoint like the browser frontend, we’ll use a modal.Queue to pass audio data and transcriptions between our local machine and the GPU container.

AUDIO_URL = "https://github.com/voxserv/audio_quality_testing_samples/raw/refs/heads/master/mono_44100/156550__acclivity__a-dream-within-a-dream.wav"
TARGET_SAMPLE_RATE = 16_000
CHUNK_SIZE = 16_000  # send one second of audio at a time


@app.local_entrypoint()
async def main(audio_url: str = AUDIO_URL):
    from urllib.request import urlopen

    print(f"🌐 Downloading audio file from {audio_url}")
    audio_bytes = urlopen(audio_url).read()
    print(f"🎧 Downloaded {len(audio_bytes)} bytes")

    audio_data = preprocess_audio(audio_bytes)

    print("🎤 Starting Transcription")
    with modal.Queue.ephemeral() as q:
        Parakeet().run_with_queue.spawn(q)
        send = asyncio.create_task(send_audio(q, audio_data))
        recv = asyncio.create_task(receive_text(q))
        await asyncio.gather(send, recv)
    print("✅ Transcription complete!")

Below are the two functions that coordinate streaming audio and receiving transcriptions.

send_audio transmits chunks of audio data with a slight delay, as though it was being streamed from a live source, like a microphone. receive_text waits for transcribed text to arrive and prints it.

async def send_audio(q, audio_bytes):
    for chunk in chunk_audio(audio_bytes, CHUNK_SIZE):
        await q.put.aio(chunk, partition="audio")
        await asyncio.sleep(CHUNK_SIZE / TARGET_SAMPLE_RATE / 8)
    await q.put.aio(END_OF_STREAM, partition="audio")


async def receive_text(q):
    while True:
        message = await q.get.aio(partition="transcription")
        if message == END_OF_STREAM:
            break

        print(message)

Addenda

The remainder of the code in this example is boilerplate, mostly for handling Parakeet’s input format.

def preprocess_audio(audio_bytes: bytes) -> bytes:
    import array
    import io
    import wave

    with wave.open(io.BytesIO(audio_bytes), "rb") as wav_in:
        n_channels = wav_in.getnchannels()
        sample_width = wav_in.getsampwidth()
        frame_rate = wav_in.getframerate()
        n_frames = wav_in.getnframes()
        frames = wav_in.readframes(n_frames)

    # Convert frames to array based on sample width
    if sample_width == 1:
        audio_data = array.array("B", frames)  # unsigned char
    elif sample_width == 2:
        audio_data = array.array("h", frames)  # signed short
    elif sample_width == 4:
        audio_data = array.array("i", frames)  # signed int
    else:
        raise ValueError(f"Unsupported sample width: {sample_width}")

    # Downmix to mono if needed
    if n_channels > 1:
        mono_data = array.array(audio_data.typecode)
        for i in range(0, len(audio_data), n_channels):
            chunk = audio_data[i : i + n_channels]
            mono_data.append(sum(chunk) // n_channels)
        audio_data = mono_data

    # Resample to 16kHz if needed
    if frame_rate != TARGET_SAMPLE_RATE:
        ratio = TARGET_SAMPLE_RATE / frame_rate
        new_length = int(len(audio_data) * ratio)
        resampled_data = array.array(audio_data.typecode)

        for i in range(new_length):
            # Linear interpolation
            pos = i / ratio
            pos_int = int(pos)
            pos_frac = pos - pos_int

            if pos_int >= len(audio_data) - 1:
                sample = audio_data[-1]
            else:
                sample1 = audio_data[pos_int]
                sample2 = audio_data[pos_int + 1]
                sample = int(sample1 + (sample2 - sample1) * pos_frac)

            resampled_data.append(sample)

        audio_data = resampled_data

    return audio_data.tobytes()


def chunk_audio(data: bytes, chunk_size: int):
    for i in range(0, len(data), chunk_size):
        yield data[i : i + chunk_size]


class NoStdStreams(object):
    def __init__(self):
        self.devnull = open(os.devnull, "w")

    def __enter__(self):
        self._stdout, self._stderr = sys.stdout, sys.stderr
        self._stdout.flush(), self._stderr.flush()
        sys.stdout, sys.stderr = self.devnull, self.devnull

    def __exit__(self, exc_type, exc_value, traceback):
        sys.stdout, sys.stderr = self._stdout, self._stderr
        self.devnull.close()