Creating and Managing Tasks with Asyncio

Creating and Managing Tasks with Asyncio


Asyncio allows developers to write asynchronous programs in Python without hassle. The module also provides many ways asynchronous tasks and with the multitude of ways to do it, it can become confusing on which one to use.

In this article, we will discuss the many ways you can create and manage tasks with asyncio.



What is an asyncio task?

In asyncio, a task is an object that wraps a coroutine and schedules it to run within the event loop. Simply put, a task is a way to run a coroutine concurrently with other tasks. Once a task is created, the event loop runs it, pausing and resuming it as necessary to allow other tasks to run.



Methods for Creating and Managing Asyncio Tasks

Now, we can discuss methods for creating and managing tasks. First, to create a task in Python using asyncio, you use the asyncio.create_task method which takes the following arguments:

  • coro (required): The coroutine object to be scheduled. This is the function you want to run asynchronously.

  • name (optional): A name for the task that can be useful for debugging or logging purposes. You can assign a string to this parameter.

    • You can also set or get the name later using Task.set_name(name) and Task.get_name().
  • context (optional): Introduced in Python 3.11, this is used to set a context variable for the task, enabling task-local storage. It’s similar to thread-local storage but for asyncio tasks.

Here is an example of the usage of asyncio.create_task:

import asyncio

# Define a coroutine
async def greet(name):
    await asyncio.sleep(1)  # Simulate an I/O-bound operation
    print(f"Hello, {name}!")

async def main():
    # Create tasks
    task1 = asyncio.create_task(greet("Alice"), name="GreetingAlice")
    task2 = asyncio.create_task(greet("Bob"), name="GreetingBob")

    # Check task names
    print(f"Task 1 name: {task1.get_name()}")
    print(f"Task 2 name: {task2.get_name()}")

    # Wait for both tasks to complete
    await task1
    await task2

# Run the main function
asyncio.run(main())
Enter fullscreen mode

Exit fullscreen mode

When you create a task, you can execute many methods such as:

  • .cancel(): to cancel the task.

  • .add_done_callback(cb): to add a callback function that runs when the task is done.

  • .done(): to check if the task is completed.

  • .result(): to retrieve the result of the task after it’s completed.

Now that we understand how to create a task, let’s see how to handle waiting for one task or a multitude of tasks.



Waiting for Tasks completion

In this section, we will discuss how to wait for a task completion, for one or many tasks. Asynchronous programming is based on the fact that we can continue the execution of a program if we have an asynchronous task running. There might be times when you want to control better the flow and want to ensure that you have a result that you can work with before safely continuing the execution of the program.

To wait for a single task completion, you can use asyncio.wait_for. It takes two arguments:

  • awaitable (required): This is the coroutine, task, or future that you want to wait for. It can be any object that can be awaited, like a coroutine function call, an asyncio.Task, or an asyncio.Future.

  • timeout (optional): This specifies the maximum number of seconds to wait for the aw to complete. If the timeout is reached and the awaitable has not completed, asyncio.wait_for raises a TimeoutError. If timeout is set to None, the function will wait indefinitely for the awaitable to complete.

Here is an example where this method is used:

import asyncio

async def slow_task():
    print("Task started...")
    await asyncio.sleep(5)  # Simulating a long-running task
    print("Task finished!")
    return "Completed"

async def main():
    try:
        # Wait for slow_task to finish within 2 seconds
        result = await asyncio.wait_for(slow_task(), timeout=2)
        print(result)
    except asyncio.TimeoutError:
        print("The task took too long and was canceled!")

asyncio.run(main())
Enter fullscreen mode

Exit fullscreen mode

In the code above, slow_task() is a coroutine that simulates a long-running task by sleeping for 5 seconds. The line asyncio.wait_for(slow_task(), timeout=2) waits for the task to complete but limits the wait to 2 seconds, causing a timeout since the task takes longer. When the timeout is exceeded, a TimeoutError is raised, the task is canceled, and the exception is handled by printing a message indicating the task took too long.

We can also wait for multiple or a group of tasks to complete. This is possible using asyncio.wait, asyncio.gather or asyncio.as_completed. Let’s explore each method.



asyncio.wait

The asyncio.wait method waits for a collection of tasks and returns two sets: one for completed tasks and one for pending tasks. It takes the following arguments:

  • aws (required, iterable of awaitables): A collection of coroutine objects, tasks, or futures that you want to wait for.

  • timeout (float or None, optional): The maximum number of seconds to wait. If not provided, it waits indefinitely.

  • return_when (constant, optional): Specifies when asyncio.wait should return. Options include:

    • asyncio.ALL_COMPLETED (default): Returns when all tasks are complete.
    • asyncio.FIRST_COMPLETED: Returns when the first task is completed.
    • asyncio.FIRST_EXCEPTION: Returns when the first task raises an exception.

Let’s see how it is used in an example.

import asyncio
import random

async def task():
    await asyncio.sleep(random.uniform(1, 3))

async def main():
    tasks = [asyncio.create_task(task()) for _ in range(3)]
    done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
    print(f"Done tasks: {len(done)}, Pending tasks: {len(pending)}")

asyncio.run(main())
Enter fullscreen mode

Exit fullscreen mode

In the code above, asyncio.wait waits for a group of tasks and returns two sets: one with completed tasks and another with those still pending. You can control when it returns, such as after the first task is completed or after all tasks are done. In the example, asyncio.wait returns when the first task is completed, leaving the rest in the pending set.



asyncio.gather

The asyncio.gather method runs multiple awaitable objects concurrently and returns a list of their results, optionally handling exceptions. Let’s see the arguments it takes.

  • *aws (required, multiple awaitables): A variable number of awaitable objects (like coroutines, tasks, or futures) to run concurrently.

  • return_exceptions (bool, optional): If True, exceptions in the tasks will be returned as part of the results list instead of being raised.

Let’s see how it can be used in an example.

import asyncio
import random

async def task(id):
    await asyncio.sleep(random.uniform(1, 3))
    return f"Task {id} done"

async def main():
    results = await asyncio.gather(task(1), task(2), task(3))
    print(results)

asyncio.run(main())
Enter fullscreen mode

Exit fullscreen mode

In the code above, asyncio.gather runs multiple awaitable objects concurrently and returns a list of their results in the order they were passed in. It allows you to handle exceptions gracefully if return_exceptions is set to True. In the example, three tasks are run simultaneously, and their results are returned in a list once all tasks are complete.



asyncio.as_completed

The asyncio.as_completed method is used to return an iterator that yields tasks as they are completed, allowing results to be processed immediately. It takes the following arguments:

  • aws (iterable of awaitables): A collection of coroutine objects, tasks, or futures.

  • timeout (float or None, optional): The maximum number of seconds to wait for tasks to complete. If not provided, it waits indefinitely.



Example

import asyncio
import random

async def task(id):
    await asyncio.sleep(random.uniform(1, 3))
    return f"Task {id} done"

async def main():
    tasks = [task(i) for i in range(3)]
    for coro in asyncio.as_completed(tasks):
        result = await coro
        print(result)

asyncio.run(main())
Enter fullscreen mode

Exit fullscreen mode

In the example above, asyncio.as_completed returns an iterator that yields results as each task completes, allowing you to process them immediately. This is useful when you want to handle results as soon as they’re available, rather than waiting for all tasks to finish. In the example, the tasks are run simultaneously, and their results are printed as each one finishes, in the order they complete.

So to make a summary, you use:

  • asyncio.wait: when you need to handle multiple tasks and want to track which tasks are completed and which are still pending. It’s useful when you care about the status of each task separately.

  • asyncio.gather: when you want to run multiple tasks concurrently and need the results in a list, especially when the order of results matters or you need to handle exceptions gracefully.

  • asyncio.as_completed: when you want to process results as soon as each task finishes, rather than waiting for all tasks to complete. It’s useful for handling results in the order they become available.

However, these methods don’t take atomic task management with built-in error handling. In the next section, we will see about asyncio.TaskGroup and how to use it to manage a group of tasks.



asyncio.TaskGroup

asyncio.TaskGroup is a context manager introduced in Python 3.11 that simplifies managing multiple tasks as a group. It ensures that if any task within the group fails, all other tasks are canceled, providing a way to handle complex task management with robust error handling. The class has one method called created_task used to create and add tasks to the task group. You pass a coroutine to this method, and it returns an asyncio.Task object that is managed by the group.

Here is an example of how it is used:

import asyncio

async def task1():
    await asyncio.sleep(1)
    return "Task 1 done"

async def task2():
    await asyncio.sleep(2)
    return "Task 2 done"

async def task_with_error():
    await asyncio.sleep(1)
    raise ValueError("An error occurred")

async def main():
    try:
        async with asyncio.TaskGroup() as tg:
            task1 = tg.create_task(task1())
            task2 = tg.create_task(task2())
            error_task = tg.create_task(task_with_error())
    except Exception as e:
        print(f"Error: {e}")

    # Print results from completed tasks
    print("Task 1 result:", task1.result())
    print("Task 2 result:", task2.result())

asyncio.run(main())
Enter fullscreen mode

Exit fullscreen mode

asyncio.TaskGroup manages multiple tasks and ensures that if any task fails, all other tasks in the group are canceled. In the example, a task with an error causes the entire group to be canceled, and only the results of completed tasks are printed.

Usage for this can be in web scraping. You can use asyncio.TaskGroup to handle multiple concurrent API requests and ensure that if any request fails, all other requests are canceled to avoid incomplete data.

We are at the end of the article and we have learned the multiple methods asyncio provides to create and manage tasks. Here is a summary of the methods:

  • asyncio.wait_for: Wait for a task with a timeout.

  • asyncio.wait: Wait for multiple tasks with flexible completion conditions.

  • asyncio.gather: Aggregate multiple tasks into a single awaitable.

  • asyncio.as_completed: Handle tasks as they are completed.

  • asyncio.TaskGroup: Manage a group of tasks with automatic cancellation on failure.



Conclusion

Asynchronous programming can transform the way you handle concurrent tasks in Python, making your code more efficient and responsive. In this article, we’ve navigated through the various methods provided by asyncio to create and manage tasks, from simple timeouts to sophisticated task groups. Understanding when and how to use each method—asyncio.wait_for, asyncio.wait, asyncio.gather, asyncio.as_completed, and asyncio.TaskGroup—will help you harness the full potential of asynchronous programming, making your applications more robust and scalable.

For a deeper dive into asynchronous programming and more practical examples, explore our detailed guide here.

If you enjoyed this article, consider subscribing to my newsletter so you don’t miss out on future updates.

Happy coding!



Source link
lol

By stp2y

Leave a Reply

Your email address will not be published. Required fields are marked *

No widgets found. Go to Widget page and add the widget in Offcanvas Sidebar Widget Area.