Source code for usim._primitives.context

from typing import Coroutine, List, TypeVar, Any, Optional, Tuple

from .._core.loop import Interrupt as CoreInterrupt
from .._core.handler import __USIM_STATE__
from .notification import Notification
from .flag import Flag
from .task import Task, TaskClosed, TaskCancelled, try_close
from .concurrent_exception import Concurrent


RT = TypeVar('RT')


[docs]class VolatileTaskClosed(TaskClosed): """A volatile :py:class:`~.Task` forcefully exited at the end of its scope"""
class CancelScope(CoreInterrupt): """A :py:class:`Scope` is being cancelled""" __slots__ = ('subject',) def __init__(self, subject: 'Scope', *token): super().__init__(*token) self.subject = subject class ScopeClosed(RuntimeError): """A :py:class:`~.Scope` has been closed and cannot be used anymore""" __slots__ = 'scope', def __init__(self, scope): #: scope which has been used after close self.scope = scope
[docs]class Scope: r""" Concurrency scope that allows branching off and waiting for multiple activities A new :py:class:`~.Scope` must be opened in an ``async with`` block. During its block, a :py:class:`~.Scope` may :py:meth:`~.Scope.do` several activities concurrently. The :py:class:`~.Scope` owns and supervises all branched off activities. .. code:: python3 async def show_clock(interval=1): "An infinite loop showing the current time" async for now in every(interval=interval): print(now) async with Scope() as scope: scope.do(time + 20) # scope can launch multiple activities at once... scope.do(time + 20) scope.do(time + 20) scope.do( show_clock(), volatile=True ) # ...and mark some as expendable on exit # block is exited once the three delays finished concurrently # block is done after a total delay of 20 Both the block of scope and all its activities form one unit of control. A :py:class:`~.Scope` will only exit once its block and all non-``volatile`` activities are done. If either encounters an unhandled exception, all are aborted; exceptions from child tasks are collapsed into a single :py:exc:`~.Concurrent` exception which is raised by the :py:class:`~.Scope`. Only the fatal exception types :py:exc:`SystemExit`, :py:exc:`KeyboardInterrupt`, and :py:exc:`AssertionError` are not collapsed, but propagated directly. During its lifetime, a :py:class:`~.Scope` can be passed around freely. Most importantly, it can be passed to child activities. This allows to :py:meth:`~.Scope.do` things in a parent scope, and to ``await`` the end of the scope. .. code:: python3 def do_some(scope): "Perform several actions in a parent scope" for delay in range(0, 20, 5): scope.do(time + delay) async def on_done(scope): "Wait for a scope to end and report it" await scope print('Scope is done at', time.now) async with Scope() as scope: do_some(scope) # pass scope around to do activities in it on_done(scope) # pass scope around to await its end """ __slots__ = '_children', '_body_done', '_activity', '_volatile_children', \ '_child_failures', '_cancel_self', '_interruptable' #: Exceptions which are *not* re-raised from concurrent tasks SUPPRESS_CONCURRENT = ( TaskCancelled, TaskClosed, GeneratorExit ) #: Exceptions which are always propagated unwrapped PROMOTE_CONCURRENT = ( SystemExit, KeyboardInterrupt, AssertionError ) def __init__(self): #: currently living child tasks self._children = [] # type: List[Task] #: currently living child tasks that we won't wait for self._volatile_children = [] # type: List[Task] #: failures encountered in children self._child_failures = [] # type: List[BaseException] # the scope body is finished and we do/did __aexit__ self._body_done = Flag() # we can still be cancelled/interrupted asynchronously self._interruptable = True self._activity = None # type: Optional[Coroutine] self._cancel_self = CancelScope(self, 'Scope._cancel_self') def __await__(self): yield from self._body_done.__await__()
[docs] def do( self, payload: Coroutine[Any, Any, RT], *, after: float = None, at: float = None, volatile: bool = False ) -> Task[RT]: r""" Concurrently perform an activity in this scope :param payload: the activity to perform :param after: delay after which to start the activity :param at: point in time at which to start the activity :param volatile: whether the activity is aborted at the end of the scope :raises ScopeClosed: if the scope has ended already :return: representation of the ongoing activity All non-``volatile`` activities are ``await``\ ed at the end of the scope. As a result, the scope only ends after all its child activities are done. Unhandled exceptions in children cause the parent scope to abort immediately; all child exceptions are collected and re-raised as part of a single :py:exc:`~.Concurrent` exception in this case. If an activity needs to shut down gracefully with its scope, it can `await` the scope. .. code:: python async def graceful(containing_scope: Scope): print('waiting for end of scope ...') await containing_scope print('... scope has finished') async with Scope() as scope: scope.do(graceful(scope)) All ``volatile`` activities are aborted at the end of the scope, after all non-``volatile`` activities have finished. Aborting ``volatile`` activities is not graceful: :py:class:`GeneratorExit` is raised in the activity, which must exit without ``await``\ ing or ``yield``\ ing anything. The scope assumes exclusive ownership of the ``payload``: no activity should modify the ``payload`` directly. The scope takes the responsibility to ``await`` and cleanup the payload as needed. It is not possible to :py:meth:`~.do` activities after the scope has ended. A :py:exc:`~.ScopeClosed` exception is raised in this case. """ if not self._interruptable: # we have been given the payload with the expectation of managing it # close it now since no-one else should expect to own it try_close(payload) raise ScopeClosed(self) assert after is None or at is None,\ "start date must be either absolute or relative" # resolve "now" to what the event loop expects if after == 0: after = None elif at == __USIM_STATE__.loop.time: at = None assert after is None or after > 0,\ "start date must not be in the past" assert at is None or at > __USIM_STATE__.loop.time,\ "start date must not be in the past" child_task = Task(payload, self, delay=after, at=at, volatile=volatile) __USIM_STATE__.loop.schedule( child_task.__runner__, ) if not volatile: self._children.append(child_task) else: self._volatile_children.append(child_task) return child_task
def __cancel__(self): """Cancel this scope""" if self._interruptable: __USIM_STATE__.loop.schedule(self._activity, self._cancel_self) def __child_finished__(self, child: Task, failed: bool): assert child.parent is self if failed: self.__cancel__() self._child_failures.append(child.__exception__) if child.__volatile__: self._volatile_children.remove(child) else: self._children.remove(child) def _disable_interrupts(self): self._interruptable = False self._cancel_self.revoke() async def _await_children(self): while self._children: for child in self._children[:]: await child.done def _close_children(self): """Forcefully close all child non-volatile tasks""" reason = TaskClosed("closed at end of scope '%s'" % self) for child in self._children.copy(): child.__close__(reason=reason) def _close_volatile(self): """Forcefully close all volatile child tasks""" reason = VolatileTaskClosed("closed at end of scope '%s'" % self) for child in self._volatile_children.copy(): child.__close__(reason=reason) async def __aenter__(self): if self._activity is not None: raise RuntimeError('%r is not re-entrant' % self.__class__.__name__) self._activity = __USIM_STATE__.loop.activity return self async def __aexit__(self, exc_type, exc_val, exc_tb) -> bool: # there was no exception, we regularly exited the loop body # we wait for our children to finish or some interrupt to happen if exc_type is None: try: # inform everyone that we are shutting down # we may receive any shutdown signal here await self._body_done.set() await self._await_children() except BaseException as err: self._close_scope() if self._propagate_exceptions(type(err), err): raise return True # there was an exception, we have to abandon the scope fast # we do not want interrupts that conflict with our current exception else: self._body_done._value = True self._body_done.__trigger__() self._close_scope() return not self._propagate_exceptions(exc_type, exc_val) def _close_scope(self): """Ultimately close the scope, its interrupts and all children""" self._disable_interrupts() self._close_children() self._close_volatile() def _collect_exceptions(self)\ -> Tuple[Optional[BaseException], Optional[Concurrent]]: """ collect any privileged and concurrent exceptions that occurred in children This returns a tuple ``(privileged, concurrent)`` of which both may be :py:const:`None` if no appropriate exception is found. """ suppress = self.SUPPRESS_CONCURRENT promote = self.PROMOTE_CONCURRENT concurrent = [] for exc in self._child_failures: if isinstance(exc, promote): return exc, None if not isinstance(exc, suppress): concurrent.append(exc) if concurrent: exc = Concurrent(*concurrent) # exc.__cause__ shows up in the stacktrace before exc # # Since everything leading up to here is not relevant # for users, there is no harm replacing it. Python only # allows one cause, so we have to choose one arbitrarily # - the first *might* be the start of a cascade at least. exc.__cause__ = concurrent[0] return None, exc return None, None def _propagate_exceptions(self, exc_type, exc_val) -> bool: if exc_type in self.PROMOTE_CONCURRENT: # we already have a privileged exception, there is nothing more important # propagate it return True elif self._is_suppressed(exc_val) or exc_type is None: # we do not have an exception to propagate, take whatever we can get privileged, concurrent = self._collect_exceptions() if privileged is not None or concurrent is not None: raise privileged or concurrent # we handled our own and there was nothing else to propagate return False else: # we already have an exception to propagate, take only important ones privileged, _ = self._collect_exceptions() if privileged is not None: raise privileged # we still have our unhandled exception to propagate return True def _is_suppressed(self, exc_val) -> bool: """ Whether the exception is handled completely by :py:meth:`~.__aexit__` This generally means that the exception was an interrupt for this scope. If the exception is meant for anyone else, we should let it propagate. """ return exc_val is self._cancel_self def __repr__(self): return ( f'<{self.__class__.__name__} in {self._activity} @ {id(self)}, ' f'children={self._children}, volatile={self._volatile_children}, ' f'done={bool(self._body_done)}>' )
class InterruptScope(Scope): r""" Scope that is closed on notification :see: :py:func:`~.until` """ __slots__ = ('_notification', '_interrupt') def __init__(self, notification: Notification): super().__init__() self._notification = notification self._interrupt = CancelScope(self, notification) async def __aenter__(self): await super().__aenter__() self._notification.__subscribe__(self._activity, self._interrupt) return self def _disable_interrupts(self): self._notification.__unsubscribe__(self._activity, self._interrupt) super()._disable_interrupts() def _is_suppressed(self, exc_val) -> bool: return exc_val is self._interrupt or super()._is_suppressed(exc_val) def __repr__(self): return ( f'<{self.__class__.__name__} in {self._activity} @ {id(self)}, ' f'notification={self._notification}, ' f'children={self._children}, ' f'volatile={self._volatile_children}, ' f'done={bool(self._body_done)}>' )
[docs]def until(notification: Notification): r""" :py:class:`Scope` that is interrupted on notification An asynchronous `until`-scope listens for a notification *without* stopping execution. This allows notification on any break point, e.g. `await` in the context or while waiting for children. .. code:: python async with until(done): await eternity # infinite waiting, interrupted by notification async with until(done) as scope: scope.do(eternity) # infinite activity, interrupted by notification :note: A break point in the context is always required, even when the notification would trigger immediately. """ return InterruptScope(notification)