from typing import Coroutine, List, TypeVar, Any, Optional, Tuple
from .._core.loop import Interrupt as CoreInterrupt
from .._core.handler import __USIM_STATE__
from .notification import Notification
from .flag import Flag
from .task import Task, TaskClosed, TaskCancelled, try_close
from .concurrent_exception import Concurrent
RT = TypeVar('RT')
[docs]class VolatileTaskClosed(TaskClosed):
"""A volatile :py:class:`~.Task` forcefully exited at the end of its scope"""
class CancelScope(CoreInterrupt):
"""A :py:class:`Scope` is being cancelled"""
__slots__ = ('subject',)
def __init__(self, subject: 'Scope', *token):
super().__init__(*token)
self.subject = subject
class ScopeClosed(RuntimeError):
"""A :py:class:`~.Scope` has been closed and cannot be used anymore"""
__slots__ = 'scope',
def __init__(self, scope):
#: scope which has been used after close
self.scope = scope
[docs]class Scope:
r"""
Concurrency scope that allows branching off and waiting for multiple activities
A new :py:class:`~.Scope` must be opened in an ``async with`` block.
During its block, a :py:class:`~.Scope` may :py:meth:`~.Scope.do`
several activities concurrently.
The :py:class:`~.Scope` owns and supervises all branched off activities.
.. code:: python3
async def show_clock(interval=1):
"An infinite loop showing the current time"
async for now in every(interval=interval):
print(now)
async with Scope() as scope:
scope.do(time + 20) # scope can launch multiple activities at once...
scope.do(time + 20)
scope.do(time + 20)
scope.do(
show_clock(),
volatile=True
) # ...and mark some as expendable on exit
# block is exited once the three delays finished concurrently
# block is done after a total delay of 20
Both the block of scope and all its activities form one unit of control.
A :py:class:`~.Scope` will only exit once its block and all non-``volatile``
activities are done.
If either encounters an unhandled exception, all are aborted;
exceptions from child tasks are collapsed into a single :py:exc:`~.Concurrent`
exception which is raised by the :py:class:`~.Scope`.
Only the fatal exception types :py:exc:`SystemExit`, :py:exc:`KeyboardInterrupt`,
and :py:exc:`AssertionError` are not collapsed, but propagated directly.
During its lifetime, a :py:class:`~.Scope` can be passed around freely.
Most importantly, it can be passed to child activities.
This allows to :py:meth:`~.Scope.do` things in a parent scope, and to ``await``
the end of the scope.
.. code:: python3
def do_some(scope):
"Perform several actions in a parent scope"
for delay in range(0, 20, 5):
scope.do(time + delay)
async def on_done(scope):
"Wait for a scope to end and report it"
await scope
print('Scope is done at', time.now)
async with Scope() as scope:
do_some(scope) # pass scope around to do activities in it
on_done(scope) # pass scope around to await its end
"""
__slots__ = '_children', '_body_done', '_activity', '_volatile_children', \
'_child_failures', '_cancel_self', '_interruptable'
#: Exceptions which are *not* re-raised from concurrent tasks
SUPPRESS_CONCURRENT = (
TaskCancelled, TaskClosed, GeneratorExit
)
#: Exceptions which are always propagated unwrapped
PROMOTE_CONCURRENT = (
SystemExit, KeyboardInterrupt, AssertionError
)
def __init__(self):
#: currently living child tasks
self._children = [] # type: List[Task]
#: currently living child tasks that we won't wait for
self._volatile_children = [] # type: List[Task]
#: failures encountered in children
self._child_failures = [] # type: List[BaseException]
# the scope body is finished and we do/did __aexit__
self._body_done = Flag()
# we can still be cancelled/interrupted asynchronously
self._interruptable = True
self._activity = None # type: Optional[Coroutine]
self._cancel_self = CancelScope(self, 'Scope._cancel_self')
def __await__(self):
yield from self._body_done.__await__()
[docs] def do(
self,
payload: Coroutine[Any, Any, RT],
*,
after: float = None,
at: float = None,
volatile: bool = False
) -> Task[RT]:
r"""
Concurrently perform an activity in this scope
:param payload: the activity to perform
:param after: delay after which to start the activity
:param at: point in time at which to start the activity
:param volatile: whether the activity is aborted at the end of the scope
:raises ScopeClosed: if the scope has ended already
:return: representation of the ongoing activity
All non-``volatile`` activities are ``await``\ ed at the end of the scope.
As a result, the scope only ends after all its child activities are done.
Unhandled exceptions in children cause the parent scope to abort immediately;
all child exceptions are collected and re-raised as part of a single
:py:exc:`~.Concurrent` exception in this case.
If an activity needs to shut down gracefully with its scope,
it can `await` the scope.
.. code:: python
async def graceful(containing_scope: Scope):
print('waiting for end of scope ...')
await containing_scope
print('... scope has finished')
async with Scope() as scope:
scope.do(graceful(scope))
All ``volatile`` activities are aborted at the end of the scope,
after all non-``volatile`` activities have finished.
Aborting ``volatile`` activities is not graceful:
:py:class:`GeneratorExit` is raised in the activity,
which must exit without ``await``\ ing or ``yield``\ ing anything.
The scope assumes exclusive ownership of the ``payload``:
no activity should modify the ``payload`` directly.
The scope takes the responsibility to ``await`` and
cleanup the payload as needed.
It is not possible to :py:meth:`~.do` activities after the scope has ended.
A :py:exc:`~.ScopeClosed` exception is raised in this case.
"""
if not self._interruptable:
# we have been given the payload with the expectation of managing it
# close it now since no-one else should expect to own it
try_close(payload)
raise ScopeClosed(self)
assert after is None or at is None,\
"start date must be either absolute or relative"
# resolve "now" to what the event loop expects
if after == 0:
after = None
elif at == __USIM_STATE__.loop.time:
at = None
assert after is None or after > 0,\
"start date must not be in the past"
assert at is None or at > __USIM_STATE__.loop.time,\
"start date must not be in the past"
child_task = Task(payload, self, delay=after, at=at, volatile=volatile)
__USIM_STATE__.loop.schedule(
child_task.__runner__,
)
if not volatile:
self._children.append(child_task)
else:
self._volatile_children.append(child_task)
return child_task
def __cancel__(self):
"""Cancel this scope"""
if self._interruptable:
__USIM_STATE__.loop.schedule(self._activity, self._cancel_self)
def __child_finished__(self, child: Task, failed: bool):
assert child.parent is self
if failed:
self.__cancel__()
self._child_failures.append(child.__exception__)
if child.__volatile__:
self._volatile_children.remove(child)
else:
self._children.remove(child)
def _disable_interrupts(self):
self._interruptable = False
self._cancel_self.revoke()
async def _await_children(self):
while self._children:
for child in self._children[:]:
await child.done
def _close_children(self):
"""Forcefully close all child non-volatile tasks"""
reason = TaskClosed("closed at end of scope '%s'" % self)
for child in self._children.copy():
child.__close__(reason=reason)
def _close_volatile(self):
"""Forcefully close all volatile child tasks"""
reason = VolatileTaskClosed("closed at end of scope '%s'" % self)
for child in self._volatile_children.copy():
child.__close__(reason=reason)
async def __aenter__(self):
if self._activity is not None:
raise RuntimeError('%r is not re-entrant' % self.__class__.__name__)
self._activity = __USIM_STATE__.loop.activity
return self
async def __aexit__(self, exc_type, exc_val, exc_tb) -> bool:
# there was no exception, we regularly exited the loop body
# we wait for our children to finish or some interrupt to happen
if exc_type is None:
try:
# inform everyone that we are shutting down
# we may receive any shutdown signal here
await self._body_done.set()
await self._await_children()
except BaseException as err:
self._close_scope()
if self._propagate_exceptions(type(err), err):
raise
return True
# there was an exception, we have to abandon the scope fast
# we do not want interrupts that conflict with our current exception
else:
self._body_done._value = True
self._body_done.__trigger__()
self._close_scope()
return not self._propagate_exceptions(exc_type, exc_val)
def _close_scope(self):
"""Ultimately close the scope, its interrupts and all children"""
self._disable_interrupts()
self._close_children()
self._close_volatile()
def _collect_exceptions(self)\
-> Tuple[Optional[BaseException], Optional[Concurrent]]:
"""
collect any privileged and concurrent exceptions that occurred in children
This returns a tuple ``(privileged, concurrent)`` of which both may be
:py:const:`None` if no appropriate exception is found.
"""
suppress = self.SUPPRESS_CONCURRENT
promote = self.PROMOTE_CONCURRENT
concurrent = []
for exc in self._child_failures:
if isinstance(exc, promote):
return exc, None
if not isinstance(exc, suppress):
concurrent.append(exc)
if concurrent:
exc = Concurrent(*concurrent)
# exc.__cause__ shows up in the stacktrace before exc
#
# Since everything leading up to here is not relevant
# for users, there is no harm replacing it. Python only
# allows one cause, so we have to choose one arbitrarily
# - the first *might* be the start of a cascade at least.
exc.__cause__ = concurrent[0]
return None, exc
return None, None
def _propagate_exceptions(self, exc_type, exc_val) -> bool:
if exc_type in self.PROMOTE_CONCURRENT:
# we already have a privileged exception, there is nothing more important
# propagate it
return True
elif self._is_suppressed(exc_val) or exc_type is None:
# we do not have an exception to propagate, take whatever we can get
privileged, concurrent = self._collect_exceptions()
if privileged is not None or concurrent is not None:
raise privileged or concurrent
# we handled our own and there was nothing else to propagate
return False
else:
# we already have an exception to propagate, take only important ones
privileged, _ = self._collect_exceptions()
if privileged is not None:
raise privileged
# we still have our unhandled exception to propagate
return True
def _is_suppressed(self, exc_val) -> bool:
"""
Whether the exception is handled completely by :py:meth:`~.__aexit__`
This generally means that the exception was an interrupt for this scope.
If the exception is meant for anyone else, we should let it propagate.
"""
return exc_val is self._cancel_self
def __repr__(self):
return (
f'<{self.__class__.__name__} in {self._activity} @ {id(self)}, '
f'children={self._children}, volatile={self._volatile_children}, '
f'done={bool(self._body_done)}>'
)
class InterruptScope(Scope):
r"""
Scope that is closed on notification
:see: :py:func:`~.until`
"""
__slots__ = ('_notification', '_interrupt')
def __init__(self, notification: Notification):
super().__init__()
self._notification = notification
self._interrupt = CancelScope(self, notification)
async def __aenter__(self):
await super().__aenter__()
self._notification.__subscribe__(self._activity, self._interrupt)
return self
def _disable_interrupts(self):
self._notification.__unsubscribe__(self._activity, self._interrupt)
super()._disable_interrupts()
def _is_suppressed(self, exc_val) -> bool:
return exc_val is self._interrupt or super()._is_suppressed(exc_val)
def __repr__(self):
return (
f'<{self.__class__.__name__} in {self._activity} @ {id(self)}, '
f'notification={self._notification}, '
f'children={self._children}, '
f'volatile={self._volatile_children}, '
f'done={bool(self._body_done)}>'
)
[docs]def until(notification: Notification):
r"""
:py:class:`Scope` that is interrupted on notification
An asynchronous `until`-scope listens for a notification
*without* stopping execution.
This allows notification on any break point,
e.g. `await` in the context or while waiting for children.
.. code:: python
async with until(done):
await eternity # infinite waiting, interrupted by notification
async with until(done) as scope:
scope.do(eternity) # infinite activity, interrupted by notification
:note: A break point in the context is always required,
even when the notification would trigger immediately.
"""
return InterruptScope(notification)