asyncio.gather(*tasks) runs every coroutine concurrently. Usually that is fine. Sometimes you have 10,000 URLs to fetch and running all of them at once melts something on the other side, or the local file descriptor limit, or your own memory. The fix is a semaphore-gated wrapper.

import asyncio
from typing import Awaitable, Callable, Iterable, TypeVar

T = TypeVar("T")

async def bounded_gather(
    tasks: Iterable[Awaitable[T]],
    *,
    limit: int,
) -> list[T]:
    sem = asyncio.Semaphore(limit)

    async def runner(coro: Awaitable[T]) -> T:
        async with sem:
            return await coro

    return await asyncio.gather(*(runner(t) for t in tasks))


# Usage:
async def fetch(url: str) -> bytes:
    ...

async def main() -> None:
    urls = [f"https://example.com/{i}" for i in range(10_000)]
    bodies = await bounded_gather((fetch(u) for u in urls), limit=32)
    print(len(bodies))

asyncio.run(main())

Note the coroutines are created lazily via the generator expression inside bounded_gather. If you pass [fetch(u) for u in urls], all 10,000 coroutines are instantiated up front, which is usually harmless but not free.

For huge input lists, stream results with asyncio.as_completed instead so you do not hold the full result set in memory. See also /snippets/python-csv-stream-large/.