Use Modal Dicts and Queues together

Modal Dicts and Queues store and communicate objects in distributed applications on Modal.

To illustrate how Dicts and Queues can interact together in a simple distributed system, consider the following example program that crawls the web, starting from some initial page and traversing links to many sites in breadth-first order.

The Modal Queue acts as a job queue, accepting new pages to crawl as they are discovered by the crawlers and doling them out to be crawled via .spawn.

The Dict is used to coordinate termination once the maximum number of URLs to crawl is reached.

Starting from Wikipedia, this spawns several dozen containers (auto-scaled on demand) and crawls about 100,000 URLs per minute.

import queue
import sys
from datetime import datetime

import modal

app = modal.App(
    image=modal.Image.debian_slim().pip_install(
        "requests~=2.32.4", "beautifulsoup4~=4.13.4"
    )
)


def extract_links(url: str) -> list[str]:
    """Extract links from a given URL."""
    import urllib.parse

    import requests
    from bs4 import BeautifulSoup

    resp = requests.get(url, timeout=10)
    resp.raise_for_status()
    soup = BeautifulSoup(resp.text, "html.parser")
    links = []
    for link in soup.find_all("a"):
        links.append(urllib.parse.urljoin(url, link.get("href")))
    return links


@app.function()
def crawl_pages(q: modal.Queue, d: modal.Dict, urls: set[str]) -> None:
    for url in urls:
        if "stop" in d:
            return
        try:
            s = datetime.now()
            links = extract_links(url)
            print(f"Crawled: {url} in {datetime.now() - s}, with {len(links)} links")
            q.put_many(links)
        except Exception as exc:
            print(
                f"Failed to crawl: {url} with error {exc}, skipping...", file=sys.stderr
            )


@app.function()
def scrape(url: str, max_urls: int = 50_000):
    start_time = datetime.now()

    # Create ephemeral dicts and queues
    with modal.Dict.ephemeral() as d, modal.Queue.ephemeral() as q:
        # The dict is used to signal the scraping to stop
        # The queue contains the URLs that have been crawled

        # Initialize queue with a starting URL
        q.put(url)

        # Crawl until the queue is empty, or reaching some number of URLs
        visited = set()
        max_urls = min(max_urls, 50_000)
        while True:
            try:
                next_urls = q.get_many(2000, timeout=5)
            except queue.Empty:
                break
            new_urls = set(next_urls) - visited
            visited |= new_urls
            if len(visited) < max_urls:
                crawl_pages.spawn(q, d, new_urls)
            else:
                d["stop"] = True

        elapsed = (datetime.now() - start_time).total_seconds()
        print(f"Crawled {len(visited)} URLs in {elapsed:.2f} seconds")


@app.local_entrypoint()
def main(starting_url=None, max_urls: int = 10_000):
    starting_url = starting_url or "https://www.wikipedia.org/"
    scrape.remote(starting_url, max_urls=max_urls)