One common patter when using Python’s async library and the co-routines is to create long-lived tasks which process elements of an async queue as they come in. Normally these tasks will have an infinite loop with await q.get() waiting for the next element to process. After the task which inserts new elements into the queue completes it is necessary to cancel the processing tasks. This is how to do it:


# This task generates items for processing
tgen = async.create_task(genitems())

# These are the processing tasks
tproc1 = async.create_task(proctask())
tproc2 = async.create_task(proctask())

# Wait for the generation task to finish
await async.gather(tgen)
# Now need to cancel the processing tasks
tproc1.cancel()
tproc2.cancel()

# Now await the cancelled tasks so that the cancellation message is processed
await async.gather(tproc1, tproc2, return_exceptions=True)

What the code above illustrates is that after the cancel() method has been called it is necessary to give that task a chance to actually process this cancellation. The cancelled tasks will raise exceptions in them; calling gather with return_exceptions=True avoids those exceptions being thrown in the parent task.