This article was originally posted on my blog.
A few weeks ago, I was working on a Python script to extract books' metadata for a content-based recommender. After a couple of hours, I realized that I needed to make thousands of requests to the Google Books API to get the data. So I thought there had to be a way of speeding up the process.
As I enjoy learning, especially when it's also a chance of procrastinating on my goals, I decided to build a project using asyncio. Afterward, feeling guilty for the time wasted, I decided to write this tutorial with what I learned in the process.
This article aims to provide the basics of how to use asyncio for making asynchronous requests to an API. I focus mostly on the actual code and skip most of the theory (besides the short introduction below). However, if you are looking for a more in-depth introduction to asyncio, check the recommendations in the references[1].
asyncio is a Python library that allows you to execute some tasks in a seemingly concurrent[2] manner. It is commonly used in web-servers and database connections. It is also useful for speeding up IO-bound tasks, like services that require making many requests or do lots of waiting for external APIs.
The essence of asyncio is that it allows the program to continue executing other instructions while waiting for specific processes to finish (e.g., a request to an API). In this tutorial, we will see how to use asyncio for accelerating a program that makes multiple requests to an API.
So let's get down to business. To get the most out of this tutorial, try running the code yourself. You can copy-and-paste the code in a Jupyter Notebook and run it without modifications. Just remember to first install the required libraries (requests and aiohttp.)
We'll build a sequential and an asynchronous version of a small program and compare their results and structure. Both programs will do the same:
The algorithm would look something like the diagram below.
We'll now compare two possible approaches for building this algorithm. First, Option A, which executes the requests sequentially. Then, Option B, which uses asyncio to run requests asynchronously.
A sequential version of that algorithm could look as follows:
import os
import requests
from requests.exceptions import HTTPError
GOOGLE_BOOKS_URL = "https://www.googleapis.com/books/v1/volumes?q=isbn:"
LIST_ISBN = [
'9780002005883',
'9780002238304',
'9780002261982',
'9780006163831',
'9780006178736',
'9780006280897',
'9780006280934',
'9780006353287',
'9780006380832',
'9780006470229',
]
def extract_fields_from_response(item):
"""Extract fields from API's response"""
volume_info = item.get("volumeInfo", {})
title = volume_info.get("title", None)
subtitle = volume_info.get("subtitle", None)
description = volume_info.get("description", None)
published_date = volume_info.get("publishedDate", None)
return (
title,
subtitle,
description,
published_date,
)
def get_book_details_seq(isbn, session):
"""Get book details using Google Books API (sequentially)"""
url = GOOGLE_BOOKS_URL + isbn
response = None
try:
response = session.get(url)
response.raise_for_status()
print(f"Response status ({url}): {response.status_code}")
except HTTPError as http_err:
print(f"HTTP error occurred: {http_err}")
except Exception as err:
print(f"An error ocurred: {err}")
response_json = response.json()
items = response_json.get("items", [{}])[0]
return items
with requests.Session() as session:
for isbn in LIST_ISBN:
try:
response = get_book_details_seq(isbn, session)
parsed_response = extract_fields_from_response(response)
print(f"Response: {json.dumps(parsed_response, indent=2)}")
except Exception as err:
print(f"Exception occured: {err}")
pass
Now, let's breakdown the code to understand what's going on.
As usual, we start by importing the required libraries. Then, we define two variables:
GOOGLE_BOOKS_URL
for specifying the URL of the Google's API we'll use for the requests. LIST_ISBN
, which is a sample list of ISBNs for testing the program.For illustration purposes, this how a request to the Google Books API looks like: https://www.googleapis.com/books/v1/volumes?q=isbn:9780002005883
Next, we define the
extract_fields_from_response
function. This function takes as input the response from the API and extracts the fields we're interested in.The parsing process in
extract_fields_from_response
is based on the response's structure from the Google Books API, which looks as follows:{
"kind": "books#volumes",
"totalItems": 1,
"items": [
{
"kind": "books#volume",
"id": "3Mx4QgAACAAJ",
"etag": "FWJF/JY16xg",
"selfLink": "https://www.googleapis.com/books/v1/volumes/3Mx4QgAACAAJ",
"volumeInfo": {
"title": "Mapping the Big Picture",
"subtitle": "Integrating Curriculum and Assessment, K-12",
...
Finally, we go into the most relevant parts of the program: how we make requests to the Google Books API.
def get_book_details_seq(isbn, session):
"""Get book details using Google Books API (sequentially)"""
url = GOOGLE_BOOKS_URL + isbn
response = None
try:
response = session.get(url)
response.raise_for_status()
print(f"Response status ({url}): {response.status_code}")
except HTTPError as http_err:
print(f"HTTP error occurred: {http_err}")
except Exception as err:
print(f"An error ocurred: {err}")
response_json = response.json()
items = response_json.get("items", [{}])[0]
return items
with requests.Session() as session:
for isbn in LIST_ISBN:
try:
response = get_book_details_seq(isbn, session)
parsed_response = extract_fields_from_response(response)
print(f"Response: {json.dumps(parsed_response, indent=2)}")
except Exception as err:
print(f"Exception occured: {err}")
pass
There are two major pieces here:
get_book_details_seq
, which is the function that executes the requests. It takes as input an ISBN and a session object[4] and returns the response from the API as a JSON structure. It also handles possible errors, like providing a wrong URL or going over your daily quota of requests. with requests.Session() as session
, is where the actual execution of requests happens. It iterates through the list of ISBNs, getting the book details, parsing them, and finally printing them to the screen.For me, executing this process takes ranges from 4 to 6 seconds. If you only need to do this a couple of times, you will not find much benefit from using asyncio. However, if instead of 10 requests, you need to do 10,000, having some concurrency in your program pays out. In the next section, we'll see how to make this algorithm faster using asyncio.
An asynchronous version of the same algorithm may look something as follows:
import aiohttp
import asyncio
import os
from aiohttp import ClientSession
GOOGLE_BOOKS_URL = "https://www.googleapis.com/books/v1/volumes?q=isbn:"
LIST_ISBN = [
'9780002005883',
'9780002238304',
'9780002261982',
'9780006163831',
'9780006178736',
'9780006280897',
'9780006280934',
'9780006353287',
'9780006380832',
'9780006470229',
]
def extract_fields_from_response(response):
"""Extract fields from API's response"""
item = response.get("items", [{}])[0]
volume_info = item.get("volumeInfo", {})
title = volume_info.get("title", None)
subtitle = volume_info.get("subtitle", None)
description = volume_info.get("description", None)
published_date = volume_info.get("publishedDate", None)
return (
title,
subtitle,
description,
published_date,
)
async def get_book_details_async(isbn, session):
"""Get book details using Google Books API (asynchronously)"""
url = GOOGLE_BOOKS_URL + isbn
try:
response = await session.request(method='GET', url=url)
response.raise_for_status()
print(f"Response status ({url}): {response.status}")
except HTTPError as http_err:
print(f"HTTP error occurred: {http_err}")
except Exception as err:
print(f"An error ocurred: {err}")
response_json = await response.json()
return response_json
async def run_program(isbn, session):
"""Wrapper for running program in an asynchronous manner"""
try:
response = await get_book_details_async(isbn, session)
parsed_response = extract_fields_from_response(response)
print(f"Response: {json.dumps(parsed_response, indent=2)}")
except Exception as err:
print(f"Exception occured: {err}")
pass
async with ClientSession() as session:
await asyncio.gather(*[run_program(isbn, session) for isbn in LIST_ISBN])
First, check the
get_book_details_async
function. An async keyword prepends it. This keyword tells Python that your function is a coroutine. Then, in the function's body, there are two await keywords. These tell that coroutine to suspend execution and give back control to the event loop, while the operation it is awaiting finishes.A coroutine is a type of generator function in Python that, instead of producing values, consumes values[4]. The interesting thing about it is that its execution pauses while waiting for new data being sent to it. In our case, this allows the execution of other parts of the program to continue in a seemingly concurrent manner.
In this case, the execution of
get_book_details_async
is suspended while the request is being performed: await session.request(method='GET', url=url)
. It is suspended again, while the request response is being parsed into a JSON structure:
await response.json()
.Next, we have the
run_program
coroutine. This one is simply a wrapper around the pipeline of getting a response from the API, parsing it, and printing the results in the screen. It awaits the execution of the get_book_details_async
coroutine.Finally, we have the code block under
async with ClientSession() as session:
. Using the asyncio.gather
syntax, we tell the program to schedule all the tasks based on the list of coroutines we provided. This is what allows us to execute tasks concurrently.For me, running this process takes around 800-1000 milliseconds.
Comparing both versions, we see that the asynchronous one is around 4 to 7.5 times faster than the sequential version. If we increase the number of requests, you'll likely get an even higher speedup. Besides, the version using asyncio is not much more complicated than the sequential version, which makes using asyncio an excellent option for the kind of task we reviewed in the tutorial.
Here are some tips I gathered while working with asyncio: