Branching Tasks and Concurrent Exceptions
Branching off from an activity is only possible in
a well-defined Scope
. Each scope is bound to the lifetime
of any child Task
to prevent stray concurrency.
Exceptions occurring concurrently are collected by the scope
into a single Concurrent
exception.
Scoping Concurrency
- class usim.Scope[source]
Concurrency scope that allows branching off and waiting for multiple activities
A new
Scope
must be opened in anasync with
block. During its block, aScope
maydo()
several activities concurrently. TheScope
owns and supervises all branched off activities.async def show_clock(interval=1): "An infinite loop showing the current time" async for now in every(interval=interval): print(now) async with Scope() as scope: scope.do(time + 20) # scope can launch multiple activities at once... scope.do(time + 20) scope.do(time + 20) scope.do( show_clock(), volatile=True ) # ...and mark some as expendable on exit # block is exited once the three delays finished concurrently # block is done after a total delay of 20
Both the block of scope and all its activities form one unit of control. A
Scope
will only exit once its block and all non-volatile
activities are done. If either encounters an unhandled exception, all are aborted; exceptions from child tasks are collapsed into a singleConcurrent
exception which is raised by theScope
. Only the fatal exception typesSystemExit
,KeyboardInterrupt
, andAssertionError
are not collapsed, but propagated directly.During its lifetime, a
Scope
can be passed around freely. Most importantly, it can be passed to child activities. This allows todo()
things in a parent scope, and toawait
the end of the scope.def do_some(scope): "Perform several actions in a parent scope" for delay in range(0, 20, 5): scope.do(time + delay) async def on_done(scope): "Wait for a scope to end and report it" await scope print('Scope is done at', time.now) async with Scope() as scope: do_some(scope) # pass scope around to do activities in it on_done(scope) # pass scope around to await its end
- PROMOTE_CONCURRENT = (<class 'SystemExit'>, <class 'KeyboardInterrupt'>, <class 'AssertionError'>)
Exceptions which are always propagated unwrapped
- SUPPRESS_CONCURRENT = (<class 'usim._primitives.task.TaskCancelled'>, <class 'usim._primitives.task.TaskClosed'>, <class 'GeneratorExit'>)
Exceptions which are not re-raised from concurrent tasks
- do(payload: Coroutine[Any, Any, usim._primitives.context.RT], *, after: Optional[float] = None, at: Optional[float] = None, volatile: bool = False) usim._primitives.task.Task[usim._primitives.context.RT] [source]
Concurrently perform an activity in this scope
- Parameters
payload – the activity to perform
after – delay after which to start the activity
at – point in time at which to start the activity
volatile – whether the activity is aborted at the end of the scope
- Raises
ScopeClosed – if the scope has ended already
- Returns
representation of the ongoing activity
All non-
volatile
activities areawait
ed at the end of the scope. As a result, the scope only ends after all its child activities are done. Unhandled exceptions in children cause the parent scope to abort immediately; all child exceptions are collected and re-raised as part of a singleConcurrent
exception in this case.If an activity needs to shut down gracefully with its scope, it can await the scope.
async def graceful(containing_scope: Scope): print('waiting for end of scope ...') await containing_scope print('... scope has finished') async with Scope() as scope: scope.do(graceful(scope))
All
volatile
activities are aborted at the end of the scope, after all non-volatile
activities have finished. Abortingvolatile
activities is not graceful:GeneratorExit
is raised in the activity, which must exit withoutawait
ing oryield
ing anything.The scope assumes exclusive ownership of the
payload
: no activity should modify thepayload
directly. The scope takes the responsibility toawait
and cleanup the payload as needed.It is not possible to
do()
activities after the scope has ended. AScopeClosed
exception is raised in this case.
- async with usim.until(notification: usim._primitives.notification.Notification)[source]
Scope
that is interrupted on notificationAn asynchronous until-scope listens for a notification without stopping execution. This allows notification on any break point, e.g. await in the context or while waiting for children.
async with until(done): await eternity # infinite waiting, interrupted by notification async with until(done) as scope: scope.do(eternity) # infinite activity, interrupted by notification
- Note
A break point in the context is always required, even when the notification would trigger immediately.
Managing Task Lifetime
- class usim.typing.Task(...)[source]
Concurrently running activity
A
Task
wraps apayload
activity that is concurrently run in aparent
Scope
. This allows to store or pass on theTask
in order to control the underlying activity. Other activities canawait
aTask
to receive results or exceptions on completion, similar to a regular activity.async def my_activity(delay): await (time + delay) return delay await my_activity() # await an activity async with Scope() as scope: task = scope.do(my_activity()) await task # await Task of an activity
In contrast to a bare activity, it is possible to
await
the result of aTask
multiple times, and
- Note
This class should not be instantiated directly. Always use a
Scope
to create it.
- cancel(*token) None [source]
Cancel this task during the current time step
If the
Task
is running, aCancelTask
is raised once the activity suspends. The activity may catch and react toCancelActivity
, but should not suppress it.If the
Task
isdone
beforeCancelTask
is raised, the cancellation is ignored. This also means that cancelling an activity multiple times is allowed, but only the first successful cancellation is stored as the cancellation cause.If the
Task
has not started running, it is cancelled immediately. This prevents any code execution, even before the first suspension.- Warning
The timing of cancelling a Task before it started running may change in the future.
- property done: usim._primitives.task.Done
Condition
whether theTask
has stopped running. This includes completion, cancellation and failure.
- property status: usim._primitives.task.TaskState
The current status of this activity
- class usim.TaskState(value)[source]
State of a
Task
- CANCELLED = 4
finished due to cancellation
- CREATED = 1
created but not running yet
- FAILED = 8
finished due to an unhandled exception
- FINISHED = 28
finished by any means
- RUNNING = 2
being executed at the moment
- SUCCESS = 16
finished normally
- exception usim.VolatileTaskClosed[source]
A volatile
Task
forcefully exited at the end of its scope
- exception usim.CancelTask(subject: usim._primitives.task.Task, *token)[source]
A
Task
is being cancelled- subject
the Task being cancelled
- exception usim.TaskCancelled(subject: usim._primitives.task.Task, *token)[source]
A
Task
has been cancelled- subject
the cancelled Task
Handling Concurrent Failure
- exception usim.Concurrent(*children: Union[usim._primitives.concurrent_exception.Concurrent, Exception])[source]
Exception from one or more concurrent activity
A meta-exception that represents any
Exception
of any failingTask
of aScope
. This does not include anyException
thrown in the body of the scope. As a result, it is possible to separately handle concurrent and regular exceptions:try: async with Scope() as scope: if random.random() < 0.5: scope.do( async_raise(RuntimeError('concurrent')) ) else: raise RuntimeError('scoped') except RuntimeError: print('Failed in body') except Concurrent: print('Failed in child')
In addition to separating concurrent and regular exceptions,
Concurrent
can also separate different concurrent exception types. Subscribing theConcurrent
type asConcurrent[Exception]
specialisesexcept
clauses to a specific concurrentException
:try: async with Scope() as scope: if random.random() < 0.333: scope.do(async_raise(KeyError('concurrent'))) elif random.random() < 0.5: scope.do(async_raise(IndexError('concurrent'))) else: scope.do(async_raise(ValueError('concurrent'))) except Concurrent[KeyError]: print('Failed key lookup') except Concurrent[IndexError]: print('Failed indexing') except (Concurrent[TypeError], Concurrent[ValueError]): print('Incorrect type/value of something!')
Since a
Scope
can run more than oneTask
concurrently, there can be more than one exception as well. SubscribingConcurrent
is possible for several types at once:Concurrent[ExceptionA, ExceptionB]
matches onlyExceptionA
andExceptionB
at the same time, andConcurrent[ExceptionA, ExceptionB, ...]
matches at leastExceptionA
andExceptionB
at the same time.try: async with Scope() as scope: scope.do(async_raise(KeyError('concurrent'))) if random.random() < 0.5: scope.do(async_raise(IndexError('concurrent'))) if random.random() < 0.5: scope.do(async_raise(ValueError('concurrent'))) except Concurrent[KeyError]: print('Failed only key lookup') except Concurrent[KeyError, IndexError]: print('Failed key lookup and indexing') except Concurrent[KeyError, ...]: print('Failed key lookup and something else')
Note that
except (Concurrent[A], Concurrent[B]:
means eitherA
orB
whereasexcept Concurrent[A, B]:
means bothA
andB
.- children: Tuple[Union[usim._primitives.concurrent_exception.Concurrent, Exception], ...]
Exceptions that occurred concurrently
- flattened() usim._primitives.concurrent_exception.Concurrent [source]
Collapse nested Concurrent exceptions
Recursively collapses nested
Concurrent
exceptions to provide a singleConcurrent
exception containing allchildren
of the hierarchy. For example, flattening aConcurrent(Concurrent(KeyError()), IndexError())
provides aConcurrent(KeyError(), IndexError())
.
- specialisations: ClassVar[Optional[Tuple[Type[Exception], ...]]] = None
Specialisations this type expects in order to match
- template
Basic template of specialisation
See also
For the use of AssertionError
by μSim, see also ./debug.