Asyncio in Python is deceptively intuitive, and with its pseudo real time / event processing /single threaded nature allows most features of concurrent programming while avoiding or hiding most of the complexities associated with multi-threaded/multi-processing programming. In this article the versatility of asyncio program is examined by looking at a very specific problem:
Assume that there is a task list and each task is being worked on. When one or more tasks complete, another task or tasks should start. This is a very typical problem where there is a limit on the number of concurrent tasks in execution, but the number of tasks are not. Here is a contemporary example: Imagine the Covid 19 vaccine administration in a Parking lot. Four vaccination stations have been set up. Each station administers a vaccine shot which takes between 1-3 minutes. The task is to complete boxes of vaccines with each box having 100 doses.
Figure 1: Schematic of the parking lot vaccine administration
Let us see how Python asyncio will administer the shots! Before you get too eager, let me reveal and briefly discuss the real title of this article: 'Dynamic addition of new tasks to a list of awaited tasks'.
Problem description: A list of tasks are being executed. Situations will arise where you want to add another task to the list dynamically. Using Python asyncio we write this problem in pseudo code:
while True:
#a tasklist of t1,t2,t3 is being executed
asyncio.wait(t1(),t2(),t3(),return_when=asyncio.FIRST_COMPLETED)
<some tasks completes and execution gets here..
Now we want another task or run the same task again into the list>
To solve this problem, we maintain a dict of the running tasks, and insert new tasks into the list as tasks get completed. This is coded below:
#Create a dict of running tasks. These are function names only and hence they will not be called but a list is maintained.
runningTasks={'t1':t1,'t2':t2,'t3':t3} #
#start the tasks to run concurrently. Here the async functions are called or initiated
tasks={k:asyncio.create_task(v(),name=k ) for k,v in taskDict.items()}
#wait for one or more tasks to complete and insert new tasks into the dict
while True:
done,pending = await asyncio.wait(tasks.values(),return_when=asyncio.FIRST_COMPLETED)
for t in done: #we are only concerned about the tasks done. Only these need to be restarted
if not t.exception():
print(f'Task {t.get_name()} is done! Starting the Next Run of the task..')
tasks[tNm]=asyncio.create_task(taskDict[tNm](),name=tNm)
else: #some exception occurred during task execution and has to be resolved
print(f'Task {t.get_name()} got exception {t.exception()}. Please take care of it!')
#recover/restart the task
tNm=t.get_name()
tasks[tNm]=asyncio.create_task(taskDict[tNm](),name=tNm)
I recommend reading the above code about 3 times and refer asyncio docs, especially if you are not familiar with the the asyncio functions used above.
Now back to the problem of Vaccination. The above 'Dynamic addition of new tasks to a list of awaited tasks' is used to manage the allocation of vaccines to each station. This in in the async procedure dynamicTaskList. The vaccination station is simulated by the async procedure vaccinateStation. Each time a vaccinateStation uses up a box of vaccines, it returns, and then is restarted with a new box of vaccines.
import random, asyncio,time
async def vaccinateStation():
#Task that administers vaccine one by one from a box of 100 vaccines
boxCapacity=100 #capacity of truck
vaccinesLeft=boxCapacity #initially all are left to administer
while True:
await asyncio.sleep(random.uniform(0.001,0.1)) # 0.001 to 0.1 seconds for each shot
vaccinesLeft -=1
if vaccinesLeft ==0 : break #box is now empty
return boxCapacity #return number of vaccines administered
async def dynamicTaskList():
taskDict={'b1':vaccinateStation,'b2':vaccinateStation,'b3':vaccinateStation} # 3 stations
#start the tasks. Here the async functions are called or initiated
tasks={k:asyncio.create_task(v(),name=k ) for k,v in taskDict.items()}
#100 boxes of Vaccines
noOfVaccineBoxes=100
#time tracker
timeNow=time.time()
for i in range(noOfVaccineBoxes):
#await the tasks
done,pending = await asyncio.wait(tasks.values(),return_when=asyncio.FIRST_COMPLETED)
for t in done: #we are only concerned about the tasks done. Only these need to be restarted
tNm=t.get_name()
if t.exception():
print(f'Task {tNm} got exception {t.exception()}. Please take care of it!')
#recover/restart the task
tasks[tNm]=asyncio.create_task(taskDict[tNm](),name=tNm)
else:
print(f'vaccine box {i} is done! Vaccinated {t.result()} and and time is {(time.time() - timeNow):.2f} seconds. Starting on the Next box..')
tasks[tNm]=asyncio.create_task(taskDict[tNm](),name=tNm)
asyncio.run(dynamicTaskList())
Running this program gives the following output:
(venv) vv@vv-GL63-8RD:~/git/vv3/test$ python dynamicVaccinator.py
vaccine box 0 is done! Vaccinated 100 and and time is 5.11 seconds. Starting on the Next box..
vaccine box 1 is done! Vaccinated 100 and and time is 5.26 seconds. Starting on the Next box..
vaccine box 2 is done! Vaccinated 100 and and time is 5.33 seconds. Starting on the Next box..
vaccine box 3 is done! Vaccinated 100 and and time is 10.31 seconds. Starting on the Next box..
vaccine box 4 is done! Vaccinated 100 and and time is 10.37 seconds. Starting on the Next box..
Here we notice that about 100 vaccinations are being administered in about 5 seconds. We are shooting out vaccines from a machine gun! The vaccination rate is controlled in line 8:
await asyncio.sleep(random.uniform(0.001,0.1)) # 0.001 to 0.1 seconds for each shot
Other parameters like number of vaccines per box are also defined.
Try executing this program (written/tested on Python 3.8), change parameters, add slow/fast vaccinateStation's etc. to explore the asyncio features. Please note that if you put in realistic vaccination rates (1-3 minutes), this program will take hours to execute, since it operates in pseudo real-time.
Enjoy!