Branching Tasks and Concurrent Exceptions
Branching off from an activity is only possible in
Scope. Each scope is bound to the lifetime
of any child
Task to prevent stray concurrency.
Exceptions occurring concurrently are collected by the scope
into a single
- class usim.Scope
Concurrency scope that allows branching off and waiting for multiple activities
async def show_clock(interval=1): "An infinite loop showing the current time" async for now in every(interval=interval): print(now) async with Scope() as scope: scope.do(time + 20) # scope can launch multiple activities at once... scope.do(time + 20) scope.do(time + 20) scope.do( show_clock(), volatile=True ) # ...and mark some as expendable on exit # block is exited once the three delays finished concurrently # block is done after a total delay of 20
Both the block of scope and all its activities form one unit of control. A
Scopewill only exit once its block and all non-
volatileactivities are done. If either encounters an unhandled exception, all are aborted; exceptions from child tasks are collapsed into a single
Concurrentexception which is raised by the
Scope. Only the fatal exception types
AssertionErrorare not collapsed, but propagated directly.
def do_some(scope): "Perform several actions in a parent scope" for delay in range(0, 20, 5): scope.do(time + delay) async def on_done(scope): "Wait for a scope to end and report it" await scope print('Scope is done at', time.now) async with Scope() as scope: do_some(scope) # pass scope around to do activities in it on_done(scope) # pass scope around to await its end
- PROMOTE_CONCURRENT = (<class 'SystemExit'>, <class 'KeyboardInterrupt'>, <class 'AssertionError'>)
Exceptions which are always propagated unwrapped
- SUPPRESS_CONCURRENT = (<class 'usim._primitives.task.TaskCancelled'>, <class 'usim._primitives.task.TaskClosed'>, <class 'GeneratorExit'>)
Exceptions which are not re-raised from concurrent tasks
- do(payload: Coroutine[Any, Any, usim._primitives.context.RT], *, after: Optional[float] = None, at: Optional[float] = None, volatile: bool = False) usim._primitives.task.Task[usim._primitives.context.RT]
Concurrently perform an activity in this scope
payload – the activity to perform
after – delay after which to start the activity
at – point in time at which to start the activity
volatile – whether the activity is aborted at the end of the scope
ScopeClosed – if the scope has ended already
representation of the ongoing activity
awaited at the end of the scope. As a result, the scope only ends after all its child activities are done. Unhandled exceptions in children cause the parent scope to abort immediately; all child exceptions are collected and re-raised as part of a single
Concurrentexception in this case.
If an activity needs to shut down gracefully with its scope, it can await the scope.
async def graceful(containing_scope: Scope): print('waiting for end of scope ...') await containing_scope print('... scope has finished') async with Scope() as scope: scope.do(graceful(scope))
volatileactivities are aborted at the end of the scope, after all non-
volatileactivities have finished. Aborting
volatileactivities is not graceful:
GeneratorExitis raised in the activity, which must exit without
The scope assumes exclusive ownership of the
payload: no activity should modify the
payloaddirectly. The scope takes the responsibility to
awaitand cleanup the payload as needed.
It is not possible to
do()activities after the scope has ended. A
ScopeClosedexception is raised in this case.
- async with usim.until(notification: usim._primitives.notification.Notification)
Scopethat is interrupted on notification
An asynchronous until-scope listens for a notification without stopping execution. This allows notification on any break point, e.g. await in the context or while waiting for children.
async with until(done): await eternity # infinite waiting, interrupted by notification async with until(done) as scope: scope.do(eternity) # infinite activity, interrupted by notification
A break point in the context is always required, even when the notification would trigger immediately.
Managing Task Lifetime
- class usim.typing.Task(...)
Concurrently running activity
payloadactivity that is concurrently run in a
Scope. This allows to store or pass on the
Taskin order to control the underlying activity. Other activities can
Taskto receive results or exceptions on completion, similar to a regular activity.
async def my_activity(delay): await (time + delay) return delay await my_activity() # await an activity async with Scope() as scope: task = scope.do(my_activity()) await task # await Task of an activity
In contrast to a bare activity, it is possible to
awaitthe result of a
Taskmultiple times, and
This class should not be instantiated directly. Always use a
Scopeto create it.
- cancel(*token) None
Cancel this task during the current time step
CancelTaskis raised, the cancellation is ignored. This also means that cancelling an activity multiple times is allowed, but only the first successful cancellation is stored as the cancellation cause.
Taskhas not started running, it is cancelled immediately. This prevents any code execution, even before the first suspension.
The timing of cancelling a Task before it started running may change in the future.
- property done: usim._primitives.task.Done
Taskhas stopped running. This includes completion, cancellation and failure.
- class usim.TaskState(value)
State of a
- CANCELLED = 4
finished due to cancellation
- CREATED = 1
created but not running yet
- FAILED = 8
finished due to an unhandled exception
- FINISHED = 28
finished by any means
- RUNNING = 2
being executed at the moment
- SUCCESS = 16
- exception usim.CancelTask(subject: usim._primitives.task.Task, *token)
Taskis being cancelled
the Task being cancelled
Handling Concurrent Failure
- exception usim.Concurrent(*children: Union[usim._primitives.concurrent_exception.Concurrent, Exception])
Exception from one or more concurrent activity
A meta-exception that represents any
Exceptionof any failing
Scope. This does not include any
Exceptionthrown in the body of the scope. As a result, it is possible to separately handle concurrent and regular exceptions:
try: async with Scope() as scope: if random.random() < 0.5: scope.do( async_raise(RuntimeError('concurrent')) ) else: raise RuntimeError('scoped') except RuntimeError: print('Failed in body') except Concurrent: print('Failed in child')
In addition to separating concurrent and regular exceptions,
Concurrentcan also separate different concurrent exception types. Subscribing the
exceptclauses to a specific concurrent
try: async with Scope() as scope: if random.random() < 0.333: scope.do(async_raise(KeyError('concurrent'))) elif random.random() < 0.5: scope.do(async_raise(IndexError('concurrent'))) else: scope.do(async_raise(ValueError('concurrent'))) except Concurrent[KeyError]: print('Failed key lookup') except Concurrent[IndexError]: print('Failed indexing') except (Concurrent[TypeError], Concurrent[ValueError]): print('Incorrect type/value of something!')
Scopecan run more than one
Taskconcurrently, there can be more than one exception as well. Subscribing
Concurrentis possible for several types at once:
Concurrent[ExceptionA, ExceptionB]matches only
ExceptionBat the same time, and
Concurrent[ExceptionA, ExceptionB, ...]matches at least
ExceptionBat the same time.
try: async with Scope() as scope: scope.do(async_raise(KeyError('concurrent'))) if random.random() < 0.5: scope.do(async_raise(IndexError('concurrent'))) if random.random() < 0.5: scope.do(async_raise(ValueError('concurrent'))) except Concurrent[KeyError]: print('Failed only key lookup') except Concurrent[KeyError, IndexError]: print('Failed key lookup and indexing') except Concurrent[KeyError, ...]: print('Failed key lookup and something else')
except (Concurrent[A], Concurrent[B]:means either
except Concurrent[A, B]:means both
- children: Tuple[Union[usim._primitives.concurrent_exception.Concurrent, Exception], ...]
Exceptions that occurred concurrently
- flattened() usim._primitives.concurrent_exception.Concurrent
Collapse nested Concurrent exceptions
Recursively collapses nested
Concurrentexceptions to provide a single
Concurrentexception containing all
childrenof the hierarchy. For example, flattening a
Concurrent(Concurrent(KeyError()), IndexError())provides a
- specialisations: ClassVar[Optional[Tuple[Type[Exception], ...]]] = None
Specialisations this type expects in order to match
For the use of
AssertionError by μSim, see also ./debug.