Branching Tasks and Concurrent Exceptions

Branching off from an activity is only possible in a well-defined Scope. Each scope is bound to the lifetime of any child Task to prevent stray concurrency. Exceptions occurring concurrently are collected by the scope into a single Concurrent exception.

Scoping Concurrency

class usim.Scope[source]

Concurrency scope that allows branching off and waiting for multiple activities

A new Scope must be opened in an async with block. During its block, a Scope may do() several activities concurrently. The Scope owns and supervises all branched off activities.

async def show_clock(interval=1):
    "An infinite loop showing the current time"
    async for now in every(interval=interval):
        print(now)

async with Scope() as scope:
    scope.do(time + 20)  # scope can launch multiple activities at once...
    scope.do(time + 20)
    scope.do(time + 20)
    scope.do(
        show_clock(),
        volatile=True
    )  #  ...and mark some as expendable on exit
    # block is exited once the three delays finished concurrently
# block is done after a total delay of 20

Both the block of scope and all its activities form one unit of control. A Scope will only exit once its block and all non-volatile activities are done. If either encounters an unhandled exception, all are aborted; exceptions from child tasks are collapsed into a single Concurrent exception which is raised by the Scope. Only the fatal exception types SystemExit, KeyboardInterrupt, and AssertionError are not collapsed, but propagated directly.

During its lifetime, a Scope can be passed around freely. Most importantly, it can be passed to child activities. This allows to do() things in a parent scope, and to await the end of the scope.

def do_some(scope):
    "Perform several actions in a parent scope"
    for delay in range(0, 20, 5):
        scope.do(time + delay)

async def on_done(scope):
    "Wait for a scope to end and report it"
    await scope
    print('Scope is done at', time.now)

async with Scope() as scope:
    do_some(scope)  # pass scope around to do activities in it
    on_done(scope)  # pass scope around to await its end
PROMOTE_CONCURRENT = (<class 'SystemExit'>, <class 'KeyboardInterrupt'>, <class 'AssertionError'>)

Exceptions which are always propagated unwrapped

SUPPRESS_CONCURRENT = (<class 'usim._primitives.task.TaskCancelled'>, <class 'usim._primitives.task.TaskClosed'>, <class 'GeneratorExit'>)

Exceptions which are not re-raised from concurrent tasks

do(payload: Coroutine[Any, Any, usim._primitives.context.RT], *, after: Optional[float] = None, at: Optional[float] = None, volatile: bool = False) usim._primitives.task.Task[usim._primitives.context.RT][source]

Concurrently perform an activity in this scope

Parameters
  • payload – the activity to perform

  • after – delay after which to start the activity

  • at – point in time at which to start the activity

  • volatile – whether the activity is aborted at the end of the scope

Raises

ScopeClosed – if the scope has ended already

Returns

representation of the ongoing activity

All non-volatile activities are awaited at the end of the scope. As a result, the scope only ends after all its child activities are done. Unhandled exceptions in children cause the parent scope to abort immediately; all child exceptions are collected and re-raised as part of a single Concurrent exception in this case.

If an activity needs to shut down gracefully with its scope, it can await the scope.

async def graceful(containing_scope: Scope):
    print('waiting for end of scope ...')
    await containing_scope
    print('... scope has finished')

async with Scope() as scope:
    scope.do(graceful(scope))

All volatile activities are aborted at the end of the scope, after all non-volatile activities have finished. Aborting volatile activities is not graceful: GeneratorExit is raised in the activity, which must exit without awaiting or yielding anything.

The scope assumes exclusive ownership of the payload: no activity should modify the payload directly. The scope takes the responsibility to await and cleanup the payload as needed.

It is not possible to do() activities after the scope has ended. A ScopeClosed exception is raised in this case.

async with usim.until(notification: usim._primitives.notification.Notification)[source]

Scope that is interrupted on notification

An asynchronous until-scope listens for a notification without stopping execution. This allows notification on any break point, e.g. await in the context or while waiting for children.

async with until(done):
    await eternity  # infinite waiting, interrupted by notification

async with until(done) as scope:
    scope.do(eternity)  # infinite activity, interrupted by notification
Note

A break point in the context is always required, even when the notification would trigger immediately.

Managing Task Lifetime

class usim.typing.Task(...)[source]

Concurrently running activity

A Task wraps a payload activity that is concurrently run in a parent Scope. This allows to store or pass on the Task in order to control the underlying activity. Other activities can await a Task to receive results or exceptions on completion, similar to a regular activity.

async def my_activity(delay):
    await (time + delay)
    return delay

await my_activity()  # await an activity

async with Scope() as scope:
    task = scope.do(my_activity())
    await task   # await Task of an activity

In contrast to a bare activity, it is possible to

Note

This class should not be instantiated directly. Always use a Scope to create it.

cancel(*token) None[source]

Cancel this task during the current time step

If the Task is running, a CancelTask is raised once the activity suspends. The activity may catch and react to CancelActivity, but should not suppress it.

If the Task is done before CancelTask is raised, the cancellation is ignored. This also means that cancelling an activity multiple times is allowed, but only the first successful cancellation is stored as the cancellation cause.

If the Task has not started running, it is cancelled immediately. This prevents any code execution, even before the first suspension.

Warning

The timing of cancelling a Task before it started running may change in the future.

property done: usim._primitives.task.Done

Condition whether the Task has stopped running. This includes completion, cancellation and failure.

property status: usim._primitives.task.TaskState

The current status of this activity

class usim.TaskState(value)[source]

State of a Task

CANCELLED = 4

finished due to cancellation

CREATED = 1

created but not running yet

FAILED = 8

finished due to an unhandled exception

FINISHED = 28

finished by any means

RUNNING = 2

being executed at the moment

SUCCESS = 16

finished normally

exception usim.TaskClosed[source]

A Task forcefully exited

exception usim.VolatileTaskClosed[source]

A volatile Task forcefully exited at the end of its scope

exception usim.CancelTask(subject: usim._primitives.task.Task, *token)[source]

A Task is being cancelled

subject

the Task being cancelled

exception usim.TaskCancelled(subject: usim._primitives.task.Task, *token)[source]

A Task has been cancelled

subject

the cancelled Task

Handling Concurrent Failure

exception usim.Concurrent(*children: Union[usim._primitives.concurrent_exception.Concurrent, Exception])[source]

Exception from one or more concurrent activity

A meta-exception that represents any Exception of any failing Task of a Scope. This does not include any Exception thrown in the body of the scope. As a result, it is possible to separately handle concurrent and regular exceptions:

try:
    async with Scope() as scope:
        if random.random() < 0.5:
            scope.do(
                async_raise(RuntimeError('concurrent'))
            )
        else:
            raise RuntimeError('scoped')
except RuntimeError:
    print('Failed in body')
except Concurrent:
    print('Failed in child')

In addition to separating concurrent and regular exceptions, Concurrent can also separate different concurrent exception types. Subscribing the Concurrent type as Concurrent[Exception] specialises except clauses to a specific concurrent Exception:

try:
    async with Scope() as scope:
        if random.random() < 0.333:
            scope.do(async_raise(KeyError('concurrent')))
        elif random.random() < 0.5:
            scope.do(async_raise(IndexError('concurrent')))
        else:
            scope.do(async_raise(ValueError('concurrent')))
except Concurrent[KeyError]:
    print('Failed key lookup')
except Concurrent[IndexError]:
    print('Failed indexing')
except (Concurrent[TypeError], Concurrent[ValueError]):
    print('Incorrect type/value of something!')

Since a Scope can run more than one Task concurrently, there can be more than one exception as well. Subscribing Concurrent is possible for several types at once: Concurrent[ExceptionA, ExceptionB] matches only ExceptionA and ExceptionB at the same time, and Concurrent[ExceptionA, ExceptionB, ...] matches at least ExceptionA and ExceptionB at the same time.

try:
    async with Scope() as scope:
        scope.do(async_raise(KeyError('concurrent')))
        if random.random() < 0.5:
            scope.do(async_raise(IndexError('concurrent')))
        if random.random() < 0.5:
            scope.do(async_raise(ValueError('concurrent')))
except Concurrent[KeyError]:
    print('Failed only key lookup')
except Concurrent[KeyError, IndexError]:
    print('Failed key lookup and indexing')
except Concurrent[KeyError, ...]:
    print('Failed key lookup and something else')

Note that except (Concurrent[A], Concurrent[B]: means either A or B whereas except Concurrent[A, B]: means both A and B.

children: Tuple[Union[usim._primitives.concurrent_exception.Concurrent, Exception], ...]

Exceptions that occurred concurrently

flattened() usim._primitives.concurrent_exception.Concurrent[source]

Collapse nested Concurrent exceptions

Recursively collapses nested Concurrent exceptions to provide a single Concurrent exception containing all children of the hierarchy. For example, flattening a Concurrent(Concurrent(KeyError()), IndexError()) provides a Concurrent(KeyError(), IndexError()).

inclusive: ClassVar[bool] = True

Whether this type accepts additional unmatched specialisations

specialisations: ClassVar[Optional[Tuple[Type[Exception], ...]]] = None

Specialisations this type expects in order to match

template

Basic template of specialisation

alias of usim._primitives.concurrent_exception.Concurrent

See also

For the use of AssertionError by μSim, see also ./debug.