Branching Tasks and Concurrent Exceptions¶
Branching off from an activity is only possible in
a well-defined Scope. Each scope is bound to the lifetime
of any child Task to prevent stray concurrency.
Exceptions occurring concurrently are collected by the scope
into a single Concurrent exception.
Scoping Concurrency¶
- class usim.Scope[source]¶
Concurrency scope that allows branching off and waiting for multiple activities
A new
Scopemust be opened in anasync withblock. During its block, aScopemaydo()several activities concurrently. TheScopeowns and supervises all branched off activities.async def show_clock(interval=1): "An infinite loop showing the current time" async for now in every(interval=interval): print(now) async with Scope() as scope: scope.do(time + 20) # scope can launch multiple activities at once... scope.do(time + 20) scope.do(time + 20) scope.do( show_clock(), volatile=True ) # ...and mark some as expendable on exit # block is exited once the three delays finished concurrently # block is done after a total delay of 20
Both the block of scope and all its activities form one unit of control. A
Scopewill only exit once its block and all non-volatileactivities are done. If either encounters an unhandled exception, all are aborted; exceptions from child tasks are collapsed into a singleConcurrentexception which is raised by theScope. Only the fatal exception typesSystemExit,KeyboardInterrupt, andAssertionErrorare not collapsed, but propagated directly.During its lifetime, a
Scopecan be passed around freely. Most importantly, it can be passed to child activities. This allows todo()things in a parent scope, and toawaitthe end of the scope.def do_some(scope): "Perform several actions in a parent scope" for delay in range(0, 20, 5): scope.do(time + delay) async def on_done(scope): "Wait for a scope to end and report it" await scope print('Scope is done at', time.now) async with Scope() as scope: do_some(scope) # pass scope around to do activities in it on_done(scope) # pass scope around to await its end
- PROMOTE_CONCURRENT = (<class 'SystemExit'>, <class 'KeyboardInterrupt'>, <class 'AssertionError'>)¶
Exceptions which are always propagated unwrapped
- SUPPRESS_CONCURRENT = (<class 'usim._primitives.task.TaskCancelled'>, <class 'usim._primitives.task.TaskClosed'>, <class 'GeneratorExit'>)¶
Exceptions which are not re-raised from concurrent tasks
- do(payload: Coroutine[Any, Any, usim._primitives.context.RT], *, after: Optional[float] = None, at: Optional[float] = None, volatile: bool = False) → usim._primitives.task.Task[usim._primitives.context.RT][source]¶
Concurrently perform an activity in this scope
- Parameters
payload – the activity to perform
after – delay after which to start the activity
at – point in time at which to start the activity
volatile – whether the activity is aborted at the end of the scope
- Raises
ScopeClosed – if the scope has ended already
- Returns
representation of the ongoing activity
All non-
volatileactivities areawaited at the end of the scope. As a result, the scope only ends after all its child activities are done. Unhandled exceptions in children cause the parent scope to abort immediately; all child exceptions are collected and re-raised as part of a singleConcurrentexception in this case.If an activity needs to shut down gracefully with its scope, it can await the scope.
async def graceful(containing_scope: Scope): print('waiting for end of scope ...') await containing_scope print('... scope has finished') async with Scope() as scope: scope.do(graceful(scope))
All
volatileactivities are aborted at the end of the scope, after all non-volatileactivities have finished. Abortingvolatileactivities is not graceful:GeneratorExitis raised in the activity, which must exit withoutawaiting oryielding anything.The scope assumes exclusive ownership of the
payload: no activity should modify thepayloaddirectly. The scope takes the responsibility toawaitand cleanup the payload as needed.It is not possible to
do()activities after the scope has ended. AScopeClosedexception is raised in this case.
- async with usim.until(notification: usim._primitives.notification.Notification)[source]¶
Scopethat is interrupted on notificationAn asynchronous until-scope listens for a notification without stopping execution. This allows notification on any break point, e.g. await in the context or while waiting for children.
async with until(done): await eternity # infinite waiting, interrupted by notification async with until(done) as scope: scope.do(eternity) # infinite activity, interrupted by notification
- Note
A break point in the context is always required, even when the notification would trigger immediately.
Managing Task Lifetime¶
- class usim.typing.Task(...)[source]¶
Concurrently running activity
A
Taskwraps apayloadactivity that is concurrently run in aparentScope. This allows to store or pass on theTaskin order to control the underlying activity. Other activities canawaitaTaskto receive results or exceptions on completion, similar to a regular activity.async def my_activity(delay): await (time + delay) return delay await my_activity() # await an activity async with Scope() as scope: task = scope.do(my_activity()) await task # await Task of an activity
In contrast to a bare activity, it is possible to
awaitthe result of aTaskmultiple times, and
- Note
This class should not be instantiated directly. Always use a
Scopeto create it.
- cancel(*token) → None[source]¶
Cancel this task during the current time step
If the
Taskis running, aCancelTaskis raised once the activity suspends. The activity may catch and react toCancelActivity, but should not suppress it.If the
TaskisdonebeforeCancelTaskis raised, the cancellation is ignored. This also means that cancelling an activity multiple times is allowed, but only the first successful cancellation is stored as the cancellation cause.If the
Taskhas not started running, it is cancelled immediately. This prevents any code execution, even before the first suspension.- Warning
The timing of cancelling a Task before it started running may change in the future.
- property done: usim._primitives.task.Done¶
Conditionwhether theTaskhas stopped running. This includes completion, cancellation and failure.
- property status: usim._primitives.task.TaskState¶
The current status of this activity
- class usim.TaskState(value)[source]¶
State of a
Task- CANCELLED = 4¶
finished due to cancellation
- CREATED = 1¶
created but not running yet
- FAILED = 8¶
finished due to an unhandled exception
- FINISHED = 28¶
finished by any means
- RUNNING = 2¶
being executed at the moment
- SUCCESS = 16¶
finished normally
- exception usim.VolatileTaskClosed[source]¶
A volatile
Taskforcefully exited at the end of its scope
- exception usim.CancelTask(subject: usim._primitives.task.Task, *token)[source]¶
A
Taskis being cancelled- subject¶
the Task being cancelled
- exception usim.TaskCancelled(subject: usim._primitives.task.Task, *token)[source]¶
A
Taskhas been cancelled- subject¶
the cancelled Task
Handling Concurrent Failure¶
- exception usim.Concurrent(*children: Union[usim._primitives.concurrent_exception.Concurrent, Exception])[source]¶
Exception from one or more concurrent activity
A meta-exception that represents any
Exceptionof any failingTaskof aScope. This does not include anyExceptionthrown in the body of the scope. As a result, it is possible to separately handle concurrent and regular exceptions:try: async with Scope() as scope: if random.random() < 0.5: scope.do( async_raise(RuntimeError('concurrent')) ) else: raise RuntimeError('scoped') except RuntimeError: print('Failed in body') except Concurrent: print('Failed in child')
In addition to separating concurrent and regular exceptions,
Concurrentcan also separate different concurrent exception types. Subscribing theConcurrenttype asConcurrent[Exception]specialisesexceptclauses to a specific concurrentException:try: async with Scope() as scope: if random.random() < 0.333: scope.do(async_raise(KeyError('concurrent'))) elif random.random() < 0.5: scope.do(async_raise(IndexError('concurrent'))) else: scope.do(async_raise(ValueError('concurrent'))) except Concurrent[KeyError]: print('Failed key lookup') except Concurrent[IndexError]: print('Failed indexing') except (Concurrent[TypeError], Concurrent[ValueError]): print('Incorrect type/value of something!')
Since a
Scopecan run more than oneTaskconcurrently, there can be more than one exception as well. SubscribingConcurrentis possible for several types at once:Concurrent[ExceptionA, ExceptionB]matches onlyExceptionAandExceptionBat the same time, andConcurrent[ExceptionA, ExceptionB, ...]matches at leastExceptionAandExceptionBat the same time.try: async with Scope() as scope: scope.do(async_raise(KeyError('concurrent'))) if random.random() < 0.5: scope.do(async_raise(IndexError('concurrent'))) if random.random() < 0.5: scope.do(async_raise(ValueError('concurrent'))) except Concurrent[KeyError]: print('Failed only key lookup') except Concurrent[KeyError, IndexError]: print('Failed key lookup and indexing') except Concurrent[KeyError, ...]: print('Failed key lookup and something else')
Note that
except (Concurrent[A], Concurrent[B]:means eitherAorBwhereasexcept Concurrent[A, B]:means bothAandB.- children: Tuple[Union[usim._primitives.concurrent_exception.Concurrent, Exception], ...]¶
Exceptions that occurred concurrently
- flattened() → usim._primitives.concurrent_exception.Concurrent[source]¶
Collapse nested Concurrent exceptions
Recursively collapses nested
Concurrentexceptions to provide a singleConcurrentexception containing allchildrenof the hierarchy. For example, flattening aConcurrent(Concurrent(KeyError()), IndexError())provides aConcurrent(KeyError(), IndexError()).
- specialisations: ClassVar[Optional[Tuple[Type[Exception], ...]]] = None¶
Specialisations this type expects in order to match
- template¶
Basic template of specialisation
See also
For the use of AssertionError by μSim, see also ./debug.