paint-brush
Multiprocessing for heavy API requests with Python and the PokéAPIby@greyson
7,721 reads
7,721 reads

Multiprocessing for heavy API requests with Python and the PokéAPI

by Greyson Nevins-ArcherMarch 25th, 2020
Read on Terminal Reader
Read this story w/o Javascript
tldt arrow

Too Long; Didn't Read

Multiprocessing for heavy API requests with Python and the PokéAPI can be made easier. The Pokémon API is 100 calls per 60 seconds max. There are 964 pokémon the API returns. I wanted to share some of my learnings through an example project of scrapping the Pokémon API. Below I wrote a bit of code that pulls all of the available pokedmon while minding the API's 100 call per 60 second limits. You'll see that the iteration is fairly slow.
featured image - Multiprocessing for heavy API requests with Python and the PokéAPI
Greyson Nevins-Archer HackerNoon profile picture

While working on a recent project, I realized that heavy processes for python like scrapping could be made easier though python's multiprocessing library. The documentation and community engaging in multiprocessing is fairly sparse, so I wanted to share some of my learnings through an example project of scrapping the PokéAPI. Below I wrote a bit of code that pulls all of the available pokémon while minding the API's 100 calls per 60 second limits. You'll see that the iteration is fairly slow as there are 964 pokémon the API returns.

Before Multiprocessing

I simily create three calls

get_number_pokemon
,
get_pokemon
, and
get_all_pokemon
. The first,
get_number_pokemon
, simply returns all the url's from the api for the next process,
get_all_pokemon
, to iterate over the urls and pull the information together by using
get_pokemon
to pull each URL's response data.

import requests as req
import timeit
import time
import pandas as pd
from IPython.display import Image, HTML
import random
from tqdm import tqdm
from ratelimit import limits, sleep_and_retry



## Rate limit to help with overcalling
## pokemon api is 100 calls per 60 seconds max
@sleep_and_retry
@limits(calls=100, period=60)
def call_api(url):
    response = req.get(url)

    if response.status_code == 404:
        return 'Not Found'
    if response.status_code != 200:
        print('here', status_code, url)
        raise Exception('API response: {}'.format(response.status_code))
    return response


API_POKEMON = 'https://pokeapi.co/api/v2/pokemon/{pokemon}'

def get_number_pokemon():
    res = req.get(API_POKEMON.format(pokemon=''))
    number_pokemon = res.json()['count']
    res_url = call_api(API_POKEMON.format(pokemon='?offset=0&limit={limit}'.format(limit=str(number_pokemon))))
    pokemon_links_values = [link['url'] for link in res_url.json()['results']]
    return pokemon_links_values

def get_pokemon(link=''):
    
    info = None
    resolved = False
    
    try:
        while not resolved:
            

            res = None
            tooManyCalls = False

            try:
                res = call_api(link)
                if res == 'Not Found':
                    resolved = True
                    break
            except Exception as e:
                print(e)
                if e == 'too many calls':
                    tooManyCalls =True
            if tooManyCalls:
                time.sleep(60)
                    
            elif res.status_code < 300:

                pokemon_info = res.json()

                info = {
                    'Image' : pokemon_info['sprites']['front_default'],
                    'id' : pokemon_info['id'],
                    'name' : pokemon_info['name'],
                    'height' : pokemon_info['height'],
                    'base_experience' : pokemon_info['base_experience'],
                    'weight' : pokemon_info['weight'],
                    'species' : pokemon_info['species']['name']

                }

                resolved = True

            elif res.status_code == 429:
                time.sleep(60)
            else:
                sleep_val = random.randint(1,10)
                time.sleep(sleep_val)
                
    except Exception as e:
        print(e)
        return info
    finally:
        return info
            



def get_all_pokemon(links_pokemon=None):
    
    
    list_pokemon = []
    for link in tqdm(links_pokemon):
        
        pokemon = get_pokemon(link)
        if pokemon != None:
            list_pokemon.append(pokemon)
        time.sleep(0.3)
        
            
    pd.set_option('display.max_colwidth', None)

    df_pokemon = pd.DataFrame(list_pokemon)
      
    return df_pokemon
    

def image_formatter(im):
    return f'<img src="{im}">'

def main_pokemon_run():
    links_pokemon = get_number_pokemon()

    df_pokemon = get_all_pokemon(links_pokemon=links_pokemon)
    
    df_pokemon.sort_values(['id'],inplace=True)
    return df_pokemon, HTML(df_pokemon.iloc[0:4].to_html(formatters={'Image': image_formatter}, escape=False))
    

As you can see from the response below, the API call took roughly 9 minutes and 30 seconds at a rate of 1.69 iterations per second. We can improve this drastically by adding multiprocessing.

With Multiprocessing

Now with multiprocessing we can separate the

get_all_pokemon
function into a multiprocessing pool function. We use the
cpu_count()
built in multiprocessing function to define the number of workers needed. Since we we want to get this done as quickly as possible using the full
cpu_count - 1
will ensure the program runs the most optimally without taking up all over our
CPUs
. The
max(cpu_count() -1, 1)
ensures that this function will never be 0. Next, I use multiprocessing's built in
Manager
tool to share a global memory of our returned Pokémon.

While you can use the returned value of the multiprocess map, using a manager is nice way to use multiprocessing for other more complicated tasks. Check the documentation for the available multiprocessing.

Manager types. Another step needed is to create a partial function. As the

pool.imap
function passes an array to iterate over, we need to add our parameters more specifically to our
get_pokemon_multiprocess
function before we iterate over it. The
functools.partial
library allows us to do this by creating a partial function to send to the pool map. I use tqdm in this example to see the progress of the pool.

I also show in the code the same function without it. Now that the pool has ran, we need to ensure that it is both closed and joined, you'll see I do this immediately after and in the

finally
block. This is done to validate that even if there is an error, the process workers are closed and terminated.

The join function is necessary to have the processes after pooling being joined together. This also validates that the manager is completely finished binding.

import requests as req
import timeit
import time
import pandas as pd
from IPython.display import Image, HTML
import random
from tqdm import tqdm
from ratelimit import limits, sleep_and_retry
from multiprocessing import Pool, Manager, cpu_count
from functools import partial


API_POKEMON = 'https://pokeapi.co/api/v2/pokemon/{pokemon}'

#  To see how it ran
# def infoDebugger(title):
#     print(title)
#     print('module name:', __name__)
#     if hasattr(os, 'getppid'):
#         print('parent process:', os.getppid())
#     print('process id:', os.getpid())


@sleep_and_retry
@limits(calls=100, period=60)
def call_api(url):
    response = req.get(url)
    
    if response.status_code == 404:
        return 'Not Found'
    if response.status_code != 200:
        raise Exception('API response: {}'.format(response.status_code))
    return response


# https://docs.python.org/2/library/multiprocessing.html

def get_number_pokemon():
    res = req.get(API_POKEMON.format(pokemon=''))
    number_pokemon = res.json()['count']
    res_url = call_api(API_POKEMON.format(pokemon='?offset=0&limit={limit}'.format(limit=str(number_pokemon))))
    pokemon_links_values = [link['url'] for link in res_url.json()['results']]
    return pokemon_links_values

def get_pokemon_multiprocess(listManager=None, links_pokemon=None, process=0):
#     print('Called Pokemon', process)
    link = links_pokemon[process]
    info = None
    resolved = False
#     print(link)
    
    try:
        while not resolved:

              
            res = None
            tooManyCalls = False
            
            try:
                res = call_api(link)
                if res == 'Not Found':
                    resolved = True
                    break
            except Exception as e:
                print(e)
                if e == 'too many calls':
                    tooManyCalls =True
                    
            if tooManyCalls:
                time.sleep(60)
                
            elif res.status_code < 300:

                pokemon_info = res.json()

                info = {
                    'Image' : pokemon_info['sprites']['front_default'],
                    'id' :  pokemon_info['id'],
                    'name' : pokemon_info['name'],
                    'height' : pokemon_info['height'],
                    'base_experience' : pokemon_info['base_experience'],
                    'weight' : pokemon_info['weight'],
                    'species' : pokemon_info['species']['name']

                }

                resolved = True
                
            elif res.status_code == 429:
                print(res.status_code)
                time.sleep(60)

            else:
                print(res.status_code)
                sleep_val = random.randint(1,10)
                time.sleep(sleep_val)
                
    except Exception as e:
        print(e)
    finally:
        if info != None:
            listManager.append(info)
            time.sleep(0.5)
            return


def image_formatter(im):
    return f'<img src="{im}">'


def main_pokemon_run_multiprocessing():
    ## cannot be 0, so max(NUMBER,1) solves this
    workers = max(cpu_count()-1,1)

    ## create the pool
    manager = Manager()
    
    ## Need a manager to help get the values async, the values will be updated after join
    listManager = manager.list()
    pool = Pool(workers)
    try:

        links_pokemon = get_number_pokemon()
        part_get_clean_pokemon = partial(get_pokemon_multiprocess, listManager, links_pokemon)

#         could do this the below is visualize the rate success /etc
#         pool.imap(part_get_clean_pokemon, list(range(0, len(links_pokemon))))
#         using tqdm to see progress imap works
        for _ in tqdm(pool.imap(part_get_clean_pokemon, list(range(0, len(links_pokemon)))), total=len(links_pokemon)):
            pass
        pool.close()
        pool.join()
    finally:
        pool.close()
        pool.join()
        
    pokemonList = list(listManager)
    
    df_pokemon = pd.DataFrame(pokemonList)
    df_pokemon.sort_values(['id'],inplace=True)
    return df_pokemon, HTML(df_pokemon.iloc[0:4].to_html(formatters={'Image': image_formatter}, escape=False))
    

You'll see in the below snapshot that the pooling worked successfully and improved the runtime and iteration time drastically from 9 minutes to 1 minute and iterations per second to 14.97 iterations per second.

Hope this was helpful. To see the full code and check out my notebook on github.

Let me know if you come up with any other useful uses for this and please share any feedback on my sample if you have any.

Greyson Nevins-Archer