Dicts and queues

Modal provides a variety of distributed objects to enable seamless interactivity and data transfer across different components of a distributed system. Two key objects are dicts and queues, both of which serve specific roles in facilitating communication and data management in your applications.

A Dict in Modal provides distributed key-value storage. Much like a standard Python dictionary, it lets you store and retrieve values using keys. However, unlike a regular dictionary, a dict in Modal is shared across all containers of an application and can be accessed and manipulated concurrently from any of them.

import modal

stub = modal.Stub()
stub.my_dict = modal.Dict.new()


@stub.local_entrypoint()
def main():
    stub.my_dict["key"] = "value"  # setting a value
    value = stub.my_dict["key"]    # getting a value

Dicts in Modal are persisted, which means that the data in the dictionary is stored and can be retrieved later, even after the application is redeployed. They can also be accessed from other Modal functions.

You can store Python values of any type within Dicts, since they’re serialized using cloudpickle.

A Queue in Modal is a distributed queue-like object. It allows you to add and retrieve items in a first-in-first-out (FIFO) manner. Queues are particularly useful when you want to handle tasks or process data asynchronously, or when you need to pass messages between different components of your distributed system.

import modal

stub = modal.Stub()
stub.my_queue = modal.Queue.new()


@stub.local_entrypoint()
def main():
    stub.my_queue.put("some object")  # adding a value
    value = stub.my_queue.get()       # retrieving a value

Similar to Dicts, Queues are also persisted and support values of any type.

Asynchronous calls

Both Dicts and Queues are synchronous by default, but they support asynchronous interaction with the .aio function suffix.

@stub.local_entrypoint()
async def main():
    await stub.my_queue.put.aio(100)
    assert await stub.my_queue.get.aio() == 100

    await stub.my_dict.put.aio("hello", 400)
    assert await stub.my_dict.get.aio("hello") == 400

Note that .put and .get are aliases for the overloaded indexing operators on Dicts, and you need them name for asynchronous calls.

Please see the docs on asynchronous functions for more information.

Example: Dict and Queue Interaction

To illustrate how dicts and queues can interact together in a simple distributed system, consider the following example program that crawls the web, starting from wikipedia.org and traversing links to many sites in breadth-first order. The Queue stores pages to crawl, while the Dict is used as a kill switch to stop execution of tasks immediately upon completion.

import queue
import sys
from datetime import datetime

from modal import Dict, Image, Queue, Stub


stub = Stub()
stub.image = Image.debian_slim().pip_install("requests", "beautifulsoup4")

stub.signal = Dict.new()  # Used to signal the app to stop
stub.queue = Queue.new()  # Stream of URLs that have been crawled


def extract_links(url: str) -> list[str]:
    """Extract links from a given URL."""
    import requests
    import urllib.parse
    from bs4 import BeautifulSoup

    resp = requests.get(url)
    resp.raise_for_status()
    soup = BeautifulSoup(resp.text, "html.parser")
    links = []
    for link in soup.find_all("a"):
        links.append(urllib.parse.urljoin(url, link.get("href")))
    return links


@stub.function()
def crawl_pages(urls: set[str]) -> None:
    for url in urls:
        if "stop" in stub.signal:
            return
        try:
            s = datetime.now()
            links = extract_links(url)
            print(f"Crawled: {url} in {datetime.now() - s}, with {len(links)} links")
            stub.queue.put_many(links)
        except Exception as exc:
            print(f"Failed to crawl: {url} with error {exc}, skipping...", file=sys.stderr)


@stub.local_entrypoint()
def main():
    start_time = datetime.now()

    # Initialize queue with a starting URL
    stub.queue.put("https://www.wikipedia.org/")

    # Crawl until the queue is empty, or reaching some number of URLs
    visited = set()
    max_urls = 50000
    while True:
        try:
            next_urls = stub.queue.get_many(2000, timeout=5)
        except queue.Empty:
            break
        new_urls = set(next_urls) - visited
        visited |= new_urls
        if len(visited) < max_urls:
            crawl_pages.spawn(new_urls)
        else:
            stub.signal["stop"] = True

    elapsed = (datetime.now() - start_time).total_seconds()
    print(f"Crawled {len(visited)} URLs in {elapsed:.2f} seconds")

Starting from Wikipedia, this spawns several dozen containers (auto-scaled on demand) to crawl over 200,000 URLs in 40 seconds.