paint-brush
Concurrent Task Execution in Python With Asyncio Is A Breezeby@giwyni
958 reads
958 reads

Concurrent Task Execution in Python With Asyncio Is A Breeze

by GIWYNIJanuary 31st, 2021
Read on Terminal Reader
Read this story w/o Javascript
tldt arrow

Too Long; Didn't Read

Concurrent Task Execution in Python With Asyncio Is A Breeze. The versatility of this type of programming is examined by looking at a very specific problem. Assume that there is a task list and each task is being worked on. When one or more tasks complete, another task or tasks should start. This is a typical problem where there is no limit on the number of concurrent tasks in execution. Using Python asyncio we write this problem in pseudo code:while True: #a tasklist of t1,t2,t3 is being executed.
featured image - Concurrent Task Execution in Python With Asyncio Is A Breeze
GIWYNI HackerNoon profile picture

Asyncio in Python is deceptively intuitive, and with its pseudo real time / event processing /single threaded nature allows most features of concurrent programming while avoiding or hiding most of the complexities associated with multi-threaded/multi-processing programming. In this article the versatility of asyncio program is examined by looking at a very specific problem:

Assume that there is a task list and each task is being worked on. When one or more tasks complete, another task or tasks should start. This is a very typical problem where there is a limit on the number of concurrent tasks in execution, but the number of tasks are not. Here is a contemporary example: Imagine the Covid 19 vaccine administration in a Parking lot. Four vaccination stations have been set up. Each station administers a vaccine shot which takes between 1-3 minutes. The task is to complete boxes of vaccines with each box having 100 doses.

Figure 1: Schematic of the parking lot vaccine administration

Let us see how Python asyncio will administer the shots! Before you get too eager, let me reveal and briefly discuss the real title of this article: 'Dynamic addition of new tasks to a list of awaited tasks'.

Problem description: A list of tasks are being executed. Situations will arise where you want to add another task to the list dynamically. Using Python asyncio we write this problem in pseudo code:

while True:
    #a tasklist of t1,t2,t3 is being executed 
    asyncio.wait(t1(),t2(),t3(),return_when=asyncio.FIRST_COMPLETED) 
    <some tasks completes and execution gets here..
    Now we want another task or run the same task again into the list>

To solve this problem, we maintain a dict of the running tasks, and insert new tasks into the list as tasks get completed. This is coded below:

    #Create a dict of running tasks. These are function names only and hence they will not be called but a list is maintained.
    runningTasks={'t1':t1,'t2':t2,'t3':t3} #

    #start the tasks to run concurrently. Here the async functions are called or initiated
    tasks={k:asyncio.create_task(v(),name=k ) for k,v in taskDict.items()}
    #wait for one or more tasks to complete and insert new tasks into the dict
    while True:
        done,pending = await asyncio.wait(tasks.values(),return_when=asyncio.FIRST_COMPLETED)
        for t in done:  #we are only concerned about the tasks done. Only these need to be restarted
            if not t.exception():
                print(f'Task {t.get_name()} is done! Starting the Next Run of the task..')
                tasks[tNm]=asyncio.create_task(taskDict[tNm](),name=tNm)
            else:  #some exception occurred during task execution and has to be resolved
                    print(f'Task {t.get_name()} got exception {t.exception()}. Please take care of it!')
                    #recover/restart the task
                    tNm=t.get_name()
                    tasks[tNm]=asyncio.create_task(taskDict[tNm](),name=tNm)

I recommend reading the above code about 3 times and refer asyncio docs, especially if you are not familiar with the the asyncio functions used above.

Now back to the problem of Vaccination. The above 'Dynamic addition of new tasks to a list of awaited tasks' is used to manage the allocation of vaccines to each station. This in in the async procedure dynamicTaskList. The vaccination station is simulated by the async procedure vaccinateStation. Each time a vaccinateStation uses up a box of vaccines, it returns, and then is restarted with a new box of vaccines.

import random, asyncio,time

async def vaccinateStation():
    #Task that administers vaccine one by one from a box of 100 vaccines
    boxCapacity=100 #capacity of truck
    vaccinesLeft=boxCapacity #initially all are left to administer
    while True:
        await asyncio.sleep(random.uniform(0.001,0.1)) # 0.001 to 0.1 seconds for each shot
        vaccinesLeft -=1
        if vaccinesLeft ==0 : break #box is now empty
    return boxCapacity #return number of vaccines administered

async def dynamicTaskList():
    taskDict={'b1':vaccinateStation,'b2':vaccinateStation,'b3':vaccinateStation} # 3 stations
    #start the tasks. Here the async functions are called or initiated
    tasks={k:asyncio.create_task(v(),name=k ) for k,v in taskDict.items()}
    #100 boxes of Vaccines
    noOfVaccineBoxes=100
    #time tracker
    timeNow=time.time()
    for i in range(noOfVaccineBoxes):
        #await the tasks
        done,pending = await asyncio.wait(tasks.values(),return_when=asyncio.FIRST_COMPLETED)
        for t in done:  #we are only concerned about the tasks done. Only these need to be restarted
            tNm=t.get_name()
            if t.exception():
                    print(f'Task {tNm} got exception {t.exception()}. Please take care of it!')
                    #recover/restart the task
                    tasks[tNm]=asyncio.create_task(taskDict[tNm](),name=tNm)
            else:
                print(f'vaccine box {i} is done! Vaccinated  {t.result()} and and time is {(time.time() - timeNow):.2f} seconds. Starting on the Next box..')
                tasks[tNm]=asyncio.create_task(taskDict[tNm](),name=tNm)

asyncio.run(dynamicTaskList())

Running this program gives the following output:

(venv) vv@vv-GL63-8RD:~/git/vv3/test$ python dynamicVaccinator.py 
vaccine box 0 is done! Vaccinated  100 and and time is 5.11 seconds. Starting on the Next box..
vaccine box 1 is done! Vaccinated  100 and and time is 5.26 seconds. Starting on the Next box..
vaccine box 2 is done! Vaccinated  100 and and time is 5.33 seconds. Starting on the Next box..
vaccine box 3 is done! Vaccinated  100 and and time is 10.31 seconds. Starting on the Next box..
vaccine box 4 is done! Vaccinated  100 and and time is 10.37 seconds. Starting on the Next box..

Here we notice that about 100 vaccinations are being administered in about 5 seconds. We are shooting out vaccines from a machine gun! The vaccination rate is controlled in line 8:

await asyncio.sleep(random.uniform(0.001,0.1)) # 0.001 to 0.1 seconds for each shot

Other parameters like number of vaccines per box are also defined.

Try executing this program (written/tested on Python 3.8), change parameters, add slow/fast vaccinateStation's etc. to explore the asyncio features. Please note that if you put in realistic vaccination rates (1-3 minutes), this program will take hours to execute, since it operates in pseudo real-time.

Enjoy!