Dicts and Queues
You can store and communicate objects in your distributed applications on Modal using Dicts and Queues.
import modal
app = modal.App()
kv = modal.Dict.from_name("kv", create_if_missing=True)
fifo = modal.Queue.from_name("fifo", create_if_missing=True)
@app.local_entrypoint()
def main(flush=True):
n = kv.get("count", 0) + 1
kv["count"] = n
fifo.put(n)
if flush:
consume_queue.remote()
@app.function()
def consume_queue():
while item := fifo.get(block=False):
print(item)
Modal Dicts are Python dicts in the cloud
A Dict provides distributed key-value storage to your Modal Apps. Much like a standard Python dictionary, it lets you store and retrieve values using keys. However, unlike a regular dictionary, a Dict in Modal is accessible from anywhere, concurrently and in parallel.
# create a persistent Dict
dictionary = modal.Dict.from_name("my-dict", create_if_missing=True)
@app.local_entrypoint()
def main():
dictionary["key"] = "value" # setting a value
value = dictionary["key"] # getting a value
Dicts are persisted, which means that the data in the dictionary is stored and can be retrieved later, even after the application is redeployed.
You can access Modal Dicts asynchronously
Modal Dicts live in the cloud, which means reads and writes
against them go over the network. That has some unavoidable latency overhead,
relative to just reading from memory, of a few dozen ms.
Reads from Dicts via ["key"]
-style indexing are synchronous,
which means that latency is often directly felt by the application.
But like all Modal objects, you can also interact with Dicts asynchronously
by putting the .aio
suffix on methods — in this case, put
and get
,
which are synonyms for bracket-based indexing.
Just add the async
keyword to your local_entrypoint
s or remote Functions.
import modal
app = modal.App()
dictionary = modal.Dict.from_name("my-dict", create_if_missing=True)
@app.local_entrypoint()
async def main():
await dictionary.put.aio("key", "value") # setting a value asynchronously
assert await dictionary.get.aio("key") # getting a value asyncrhonrously
See the guide to asynchronous functions for more information.
Modal Dicts are not exactly Python dicts
Python dicts can have keys of any hashable type and values of any type.
You can store Python objects of nearly any serializable type within Dicts as keys or values.
Objects are serialized using cloudpickle
,
so precise support is inherited from that library. cloudpickle
can serialize a surprising variety of objects,
like lambda
functions or even Python modules, but it can’t serialize a few things that don’t
really make sense to serialize, like live system resources (sockets, writable file descriptors).
Note that you will need to have the library defining the type installed in the environment where you retrieve the object so that it can be deserialized.
dictionary = modal.Dict.from_name("my-dict", create_if_missing=True)
@app.function(image=modal.Image.debian_slim().pip_install("torch"))
def fill():
import torch
dictionary["torch"] = torch
dictionary["modal"] = modal
dictionary[dictionary] = dictionary # don't try this at home!
@app.local_entrypoint()
def main():
fill.remote()
print(dictionary["modal"])
print(dictionary[dictionary]["modal"].Dict)
# print(dictionary["torch"]) # DeserializationError, no torch locally
Unlike with normal Python dictionaries, updates to mutable value types will not
be reflected in other containers unless the updated object is explicitly put
back into the Dict. As a consequence, patterns like chained updates
(my_dict["outer_key"]["inner_key"] = value
) cannot be used the same way as
they would with a local dictionary.
Currently, the per-object size limit is 100 MiB and the maximum number of entries per update is 10,000. It’s recommended to use Dicts for smaller objects (under 5 MiB). Each object in the Dict will expire after 7 days of inactivity (no reads or writes).
Dicts also provide a locking primitive. See this blog post for details.
Modal Queues
A Queue in Modal is a distributed queue-like object. It allows you to add and retrieve items in a first-in-first-out (FIFO) manner. Queues are particularly useful when you want to handle tasks or process data asynchronously, or when you need to pass messages between different components of your distributed system.
import modal
app = modal.App()
my_queue = modal.Queue.from_name("my-persisted-queue", create_if_missing=True)
@app.local_entrypoint()
def main():
my_queue.put("some object") # adding a value
value = my_queue.get() # retrieving a value
Similar to Dicts, Queues are also persisted and support values of any type.
Queue partitions
Queues are split into separate FIFO partitions via a string key. By default, one partition (corresponding to an empty key) is used.
A single Queue
can contain up to 100,000 partitions, each with up to 5,000
items. Each item can be up to 1 MiB. These limits also apply to the default
partition.
import modal
app = modal.App()
my_queue = modal.Queue.from_name("my-persisted-queue", create_if_missing=True)
@app.local_entrypoint()
def main():
my_queue.put("some value")
my_queue.put(123)
assert my_queue.get() == "some value"
assert my_queue.get() == 123
my_queue.put(0)
my_queue.put(1, partition="foo")
my_queue.put(2, partition="bar")
# Default and "foo" partition are ignored by the get operation.
assert my_queue.get(partition="bar") == 2
# Set custom 10s expiration time on "foo" partition.
my_queue.put(3, partition="foo", partition_ttl=10)
# (beta feature) Iterate through items in place (read immutably)
my_queue.put(1)
assert [v for v in my_queue.iterate()] == [0, 1]
By default, each partition is cleared 24 hours after the last put
operation. A
lower TTL can be specified by the partition_ttl
argument in the put
or put_many
methods. Each partition’s expiry is handled independently.
As such, Queue
s are best used for communication between active functions and
not relied on for persistent storage.
Asynchronous calls
Queues are synchronous by default, but they support asynchronous
interaction with the .aio
function suffix.
@app.local_entrypoint()
async def main(value=None):
await my_queue.put.aio(value or 200)
assert await my_queue.get.aio() == value
See the guide to asynchronous functions for more information.
Example: Dict and Queue Interaction
To illustrate how dicts and queues can interact together in a simple distributed system, consider the following example program that crawls the web, starting from wikipedia.org and traversing links to many sites in breadth-first order. The Queue stores pages to crawl, while the Dict is used as a kill switch to stop execution of tasks immediately upon completion.
import queue
import sys
from datetime import datetime
import modal
app = modal.App(image=modal.Image.debian_slim().pip_install("requests", "beautifulsoup4"))
def extract_links(url: str) -> list[str]:
"""Extract links from a given URL."""
import requests
import urllib.parse
from bs4 import BeautifulSoup
resp = requests.get(url, timeout=10)
resp.raise_for_status()
soup = BeautifulSoup(resp.text, "html.parser")
links = []
for link in soup.find_all("a"):
links.append(urllib.parse.urljoin(url, link.get("href")))
return links
@app.function()
def crawl_pages(q: modal.Queue, d: modal.Dict, urls: set[str]) -> None:
for url in urls:
if "stop" in d:
return
try:
s = datetime.now()
links = extract_links(url)
print(f"Crawled: {url} in {datetime.now() - s}, with {len(links)} links")
q.put_many(links)
except Exception as exc:
print(f"Failed to crawl: {url} with error {exc}, skipping...", file=sys.stderr)
@app.function()
def scrape(url: str):
start_time = datetime.now()
# Create ephemeral dicts and queues
with modal.Dict.ephemeral() as d, modal.Queue.ephemeral() as q:
# The dict is used to signal the scraping to stop
# The queue contains the URLs that have been crawled
# Initialize queue with a starting URL
q.put(url)
# Crawl until the queue is empty, or reaching some number of URLs
visited = set()
max_urls = 50000
while True:
try:
next_urls = q.get_many(2000, timeout=5)
except queue.Empty:
break
new_urls = set(next_urls) - visited
visited |= new_urls
if len(visited) < max_urls:
crawl_pages.spawn(q, d, new_urls)
else:
d["stop"] = True
elapsed = (datetime.now() - start_time).total_seconds()
print(f"Crawled {len(visited)} URLs in {elapsed:.2f} seconds")
@app.local_entrypoint()
def main():
scrape.remote("https://www.wikipedia.org/")
Starting from Wikipedia, this spawns several dozen containers (auto-scaled on demand) to crawl over 200,000 URLs in 40 seconds.
Data durability
Dicts are backed by durable storage. Queues are backed by a replicated in-memory database, so data could potentially be lost, but it is unlikely. Queues and Dicts are also subject to expiration, as described by the modal.Dict and modal.Queue reference pages.
Please get in touch if you need durability for Queue objects.