Multi-node clusters (beta)
Modal supports running a training job across several coordinated containers. Each container can saturate the available GPU devices on its host (a.k.a node) and communicate with peer containers which do the same. By scaling a training job from a single GPU to 16 GPUs you can achieve nearly 16x improvements in training time.
Cluster compute capability
Modal H100 clusters provide:
- A 50 Gbps IPv6 private network for orchestration, dataset downloading.
- A 3200 Gbps RDMA scale-out network (RoCE).
- Up-to 32 H100 SXM devices. (64 coming soon!)
- At least 1TB of RAM and 4TB of local NVMe SSD per node.
- Deep burn-in testing.
- Interopability with all Modal platform functionality (Volumes, Dicts, Tunnels, etc.).
The guide will walk you through how the Modal client library enables multi-node training and integrates with torchrun
.
@clustered
Unlike standard Modal serverless containers, containers in a multi-node training job must be able to:
- Perform fast, direct network communication between each other.
- Be scheduled together, all or nothing, at the same time.
The @clustered
decorator enables this behavior.
import modal
import modal.experimental
@app.function(
gpu="H100:8",
timeout=60 * 60 * 24,
retries=modal.Retries(initial_delay=0.0, max_retries=10),
)
@modal.experimental.clustered(size=2)
def train_model():
cluster_info = modal.experimental.get_cluster_info()
container_rank = cluster_info.rank
world_size = len(cluster_info.container_ips)
main_addr = cluster_info.container_ips[0]
is_main = "(main)" if container_rank == 0 else ""
print(f"{container_rank=} {is_main} {world_size=} {main_addr=}")
...
Applying this decorator under @app.function
modifies the Function so that remote calls to it are serviced by a multi-node container group. The above configuration creates a group of four containers each having 8 H100 GPU devices, for a total of 32 devices.
Scheduling
A modal.experimental.clustered
Function runs on multiple nodes in our cloud, but executes like a normal function call. For example, all nodes are scheduled together (gang scheduling) so that your code runs on all of the requested hardware or not at all.
Traditionally this kind of cluster and scheduling management would be handled by SLURM, Kubernetes, or manually. But with Modal it’s all provided serverlessly with just an application of the decorator!
Rank & input broadcast
You may notice above that a single .remote
Function call created three input executions but returned only one output. This is how input-output is structured for multi-node training jobs on Modal. The Function call’s arguments are replicated to each container, but only the rank zero container’s is returned to the caller.
A container’s rank is a key concept in multi-node training jobs. Rank zero is the ‘leader’ rank and typically coordinates the job. Rank zero is also known as the “main” container. Rank zero’s output will always be the output of a multi-node training run.
Networking
Function containers cannot normally make direct network connections to other Function containers, but this is a requirement for multi-node training communication. So, along with gang scheduling, the @clustered
decorator enables Modal’s workspace-private inter-container networking called i6pn.
The cluster networking guide goes into more detail on i6pn, but the upshot is that each container in the cluster is made aware of the network address of all the other containers in the cluster, enabling them to communicate with each other quickly via TCP.
RDMA (Infiniband)
H100 clusters are equipped with Infiniband providing up-to 3,200Gbps scale-out bandwidth for inter-node communication.
RDMA scale-out networking is enabled with the rdma
parameter of modal.experimental.clustered.
@modal.experimental.clustered(size=2, rdma=True)
def train():
...
To run a simple Infiniband RDMA performance test see the modal-examples
repository example.
Cluster Info
modal.experimental.get_cluster_info()
exposes the following information about the cluster:
rank: int
is the container’s order within the cluster, starting from0
, the leader.container_ips: list[str]
contains the ipv6 addresses of each container in the cluster, sorted by rank.
Fault Tolerance
For a clustered Function, failures in inputs and containers are handled differently.
If an input fails on any container, this failure is not propagated to other containers in the cluster. Containers are responsible for detecting and responding to input failures on other containers.
Only rank 0’s output matters: if an input fails on the leader container (rank 0), the input is marked as failed, even if the input succeeds on another container. Similarly, if an input succeeds on the leader container but fails on another container, the input will still be marked as successful.
If a container in the cluster is preempted, Modal will terminate all remaining containers in the cluster, and retry the input.
Input Synchronization
Important: synchronization is not relevant for single training runs, and applies mostly to inference use-cases.
Modal does not synchronize input execution across containers. Containers are responsible for ensuring that they do not process inputs faster than other containers in their cluster.
In particular, it is important that the leader container (rank 0) only starts processing the next input after all other containers have finished processing the current input.
Examples
To get hands-on with multi-node training you can jump into the multinode-training-guide
repository or modal-examples
repository and modal run
something!
- Simple ‘hello world’ 4 X 1 H100 torch cluster example
- Infiniband RDMA performance test
- Use 2 x 8 H100s to train a ResNet50 model on the ImageNet dataset
- Speedrun GPT-2 training with modded-nanogpt
Torchrun Example
import modal
import modal.experimental
image = (
modal.Image.debian_slim(python_version="3.12")
.pip_install("torch~=2.5.1", "numpy~=2.2.1")
.add_local_dir(
"training", remote_path="/root/training"
)
)
app = modal.App("example-simple-torch-cluster", image=image)
n_nodes = 4
@app.function(
gpu=f"H100:8",
timeout=60 * 60 * 24,
)
@modal.experimental.clustered(size=n_nodes)
def launch_torchrun():
# import the 'torchrun' interface directly.
from torch.distributed.run import parse_args, run
cluster_info = modal.experimental.get_cluster_info()
run(
parse_args(
[
f"--nnodes={n_nodes}",
f"--node_rank={cluster_info.rank}",
f"--master_addr={cluster_info.container_ips[0]}",
f"--nproc-per-node=8",
"--master_port=1234",
"training/train.py",
]
)
)