Check out our new GPU Glossary! Read now

Run Facebook’s Segment Anything Model 2 (SAM 2) on Modal

This example demonstrates how to deploy Facebook’s SAM 2 on Modal. SAM2 is a powerful, flexible image and video segmentation model that can be used for various computer vision tasks like object detection, instance segmentation, and even as a foundation for more complex computer vision applications. SAM2 extends the capabilities of the original SAM to include video segmentation.

In particular, this example segments this video of a man jumping off the cliff.

The output should look something like this:

Set up dependencies for SAM 2

First, we set up the necessary dependencies, including torch, opencv, huggingface_hub, torchvision, and the sam2 library.

We also install ffmpeg, which we will use to manipulate videos, and a Python wrapper called ffmpeg-python for a clean interface.

from pathlib import Path

import modal

MODEL_TYPE = "facebook/sam2-hiera-large"
SAM2_GIT_SHA = "c2ec8e14a185632b0a5d8b161928ceb50197eddc"  # pin commit! research code is fragile

image = (
    modal.Image.debian_slim(python_version="3.10")
    .apt_install("git", "wget", "python3-opencv", "ffmpeg")
    .pip_install(
        "torch~=2.4.1",
        "torchvision==0.19.1",
        "opencv-python==4.10.0.84",
        "pycocotools~=2.0.8",
        "matplotlib~=3.9.2",
        "onnxruntime==1.19.2",
        "onnx==1.17.0",
        "huggingface_hub==0.25.2",
        "ffmpeg-python==0.2.0",
        f"git+https://github.com/facebookresearch/sam2.git@{SAM2_GIT_SHA}",
    )
)
app = modal.App("sam2-app", image=image)

Wrapping the SAM 2 model in a Modal class

Next, we define the Model class that will handle SAM 2 operations for both image and video.

We use @modal.build() and @modal.enter() decorators here for optimization: they prevent us from downloading or initializing the model on every call.

@modal.build() ensures this method runs during the container build process, downloading the model only once and caching it in the container image.

@modal.enter() makes sure the method runs only once when a new container starts, initializing the model and moving it to GPU.

volume = modal.Volume.from_name("sam2-inputs", create_if_missing=True)


@app.cls(gpu="A100", volumes={"/root/videos": volume})
class Model:
    @modal.build()
    @modal.enter()
    def initialize_model(self):
        """Download and initialize model."""
        from sam2.sam2_video_predictor import SAM2VideoPredictor

        self.video_predictor = SAM2VideoPredictor.from_pretrained(MODEL_TYPE)

    @modal.method()
    def generate_video_masks(
        self, video="/root/videos/input.mp4", point_coords=None
    ):
        """Generate masks for a video."""
        import ffmpeg
        import numpy as np
        import torch
        from PIL import Image

        frames_dir = convert_video_to_frames(video)

        # scan all the JPEG files in this directory
        frame_names = [
            p
            for p in frames_dir.iterdir()
            if p.suffix in [".jpg", ".jpeg", ".JPG", ".JPEG"]
        ]
        frame_names.sort(key=lambda p: int(p.stem))

        # We are hardcoding the input point and label here
        # In a real-world scenario, you would want to display the video
        # and allow the user to click on the video to select the point
        if point_coords is None:
            width, height = Image.open(frame_names[0]).size
            point_coords = [[width // 2, height // 2]]

        points = np.array(point_coords, dtype=np.float32)
        # for labels, `1` means positive click and `0` means negative click
        labels = np.array([1] * len(points), np.int32)

        # run the model on GPU
        with torch.inference_mode(), torch.autocast(
            "cuda", dtype=torch.bfloat16
        ):
            self.inference_state = self.video_predictor.init_state(
                video_path=str(frames_dir)
            )

            # add new prompts and instantly get the output on the same frame
            (
                frame_idx,
                object_ids,
                masks,
            ) = self.video_predictor.add_new_points_or_box(
                inference_state=self.inference_state,
                frame_idx=0,
                obj_id=1,
                points=points,
                labels=labels,
            )

            print(
                f"frame_idx: {frame_idx}, object_ids: {object_ids}, masks: {masks}"
            )

            # run propagation throughout the video and collect the results in a dict
            video_segments = {}  # video_segments contains the per-frame segmentation results
            for (
                out_frame_idx,
                out_obj_ids,
                out_mask_logits,
            ) in self.video_predictor.propagate_in_video(self.inference_state):
                video_segments[out_frame_idx] = {
                    out_obj_id: (out_mask_logits[i] > 0.0).cpu().numpy()
                    for i, out_obj_id in enumerate(out_obj_ids)
                }

        out_dir = Path("/root/mask_frames")
        out_dir.mkdir(exist_ok=True)

        vis_frame_stride = 5  # visualize every 5th frame
        save_segmented_frames(
            video_segments,
            frames_dir,
            out_dir,
            frame_names,
            stride=vis_frame_stride,
        )

        ffmpeg.input(
            f"{out_dir}/frame_*.png",
            pattern_type="glob",
            framerate=30 / vis_frame_stride,
        ).filter(
            "scale",
            "trunc(iw/2)*2",
            "trunc(ih/2)*2",  # round to even dimensions to encode for "dumb players", https://trac.ffmpeg.org/wiki/Encode/H.264#Encodingfordumbplayers
        ).output(
            str(out_dir / "out.mp4"), format="mp4", pix_fmt="yuv420p"
        ).run()

        return (out_dir / "out.mp4").read_bytes()

Segmenting videos from the command line

Finally, we define a local_entrypoint to run the segmentation from our local machine’s terminal.

There are several ways to pass files between the local machine and the Modal Function.

One way is to upload the files onto a Modal Volume, which acts as a distributed filesystem.

The other way is to convert the file to bytes and pass the bytes back and forth as the input or output of Python functions. We use this method to get the video file with the segmentation results in it back to the local machine.

@app.local_entrypoint()
def main(
    input_video=Path(__file__).parent / "cliff_jumping.mp4",
    x_point=250,
    y_point=200,
):
    with volume.batch_upload(force=True) as batch:
        batch.put_file(input_video, "input.mp4")

    model = Model()

    if x_point is not None and y_point is not None:
        point_coords = [[x_point, y_point]]
    else:
        point_coords = None

    print(f"Running SAM 2 on {input_video}")
    video_bytes = model.generate_video_masks.remote(point_coords=point_coords)

    dir = Path("/tmp/sam2_outputs")
    dir.mkdir(exist_ok=True, parents=True)
    output_path = dir / "segmented_video.mp4"
    output_path.write_bytes(video_bytes)
    print(f"Saved output video to {output_path}")

Helper functions for SAM 2 inference

Above, we used some helper functions to for some of the details, like breaking the video into frames. These are defined below.

def convert_video_to_frames(self, input_video="/root/videos/input.mp4"):
    import ffmpeg

    input_video = Path(input_video)
    output_dir = (  # output on local filesystem, not on the remote Volume
        input_video.parent.parent / input_video.stem / "video_frames"
    )
    output_dir.mkdir(exist_ok=True, parents=True)

    ffmpeg.input(input_video).output(
        f"{output_dir}/%05d.jpg", qscale=2, start_number=0
    ).run()

    return output_dir


def show_mask(mask, ax, obj_id=None, random_color=False):
    import matplotlib.pyplot as plt
    import numpy as np

    if random_color:
        color = np.concatenate([np.random.random(3), np.array([0.6])], axis=0)
    else:
        cmap = plt.get_cmap("tab10")
        cmap_idx = 0 if obj_id is None else obj_id
        color = np.array([*cmap(cmap_idx)[:3], 0.6])
    h, w = mask.shape[-2:]
    mask_image = mask.reshape(h, w, 1) * color.reshape(1, 1, -1)
    ax.imshow(mask_image)


def save_segmented_frames(
    video_segments, frames_dir, out_dir, frame_names, stride=5
):
    import io

    import matplotlib.pyplot as plt
    from PIL import Image

    frames_dir, out_dir = Path(frames_dir), Path(out_dir)

    frame_images = []
    inches_per_px = 1 / plt.rcParams["figure.dpi"]
    for out_frame_idx in range(0, len(frame_names), stride):
        frame = Image.open(frames_dir / frame_names[out_frame_idx])
        width, height = frame.size
        width, height = width - width % 2, height - height % 2
        fig, ax = plt.subplots(
            figsize=(width * inches_per_px, height * inches_per_px)
        )
        ax.axis("off")
        ax.imshow(frame)

        [
            show_mask(mask, ax, obj_id=obj_id)
            for (obj_id, mask) in video_segments[out_frame_idx].items()
        ]

        # Convert plot to PNG bytes
        buf = io.BytesIO()
        fig.savefig(buf, format="png", bbox_inches="tight", pad_inches=0)
        # fig.savefig(buf, format="png")
        buf.seek(0)
        frame_images.append(buf.getvalue())
        plt.close(fig)

    for ii, frame in enumerate(frame_images):
        (out_dir / f"frame_{str(ii).zfill(3)}.png").write_bytes(frame)