Source code for usim._primitives.task

from functools import wraps
import reprlib
import enum
from typing import Coroutine, TypeVar, Awaitable, Optional, Tuple, Any, List,\
    TYPE_CHECKING

from .._core.loop import Interrupt
from .._core.handler import __USIM_STATE__
from .notification import suspend
from .condition import Condition
if TYPE_CHECKING:
    from .context import Scope


RT = TypeVar('RT')


def try_close(coroutine: Coroutine):
    """Attempt to close a coroutine-like object if possible"""
    try:
        close = coroutine.close
    except AttributeError:
        pass
    else:
        close()


# enum.Flag is Py3.6+
[docs]class TaskState(enum.Flag if hasattr(enum, 'Flag') else enum.IntEnum): """State of a :py:class:`~.Task`""" #: created but not running yet CREATED = 2 ** 0 #: being executed at the moment RUNNING = 2 ** 1 #: finished due to cancellation CANCELLED = 2 ** 2 #: finished due to an unhandled exception FAILED = 2 ** 3 #: finished normally SUCCESS = 2 ** 4 #: finished by any means FINISHED = CANCELLED | FAILED | SUCCESS
[docs]class TaskCancelled(Exception): """A :py:class:`~.Task` has been cancelled""" __slots__ = ('subject',) def __init__(self, subject: 'Task', *token): super().__init__(*token) #: the cancelled Task self.subject = subject
[docs]class CancelTask(Interrupt): """A :py:class:`~.Task` is being cancelled""" __slots__ = ('subject',) def __init__(self, subject: 'Task', *token): super().__init__(*token) #: the Task being cancelled self.subject = subject @property def __transcript__(self) -> TaskCancelled: result = TaskCancelled(self.subject, *self.token) result.__cause__ = self return result
[docs]class TaskClosed(Exception): """A :py:class:`~.Task` forcefully exited"""
[docs]class Task(Awaitable[RT]): """ Concurrently running activity A :py:class:`Task` wraps a ``payload`` :term:`activity` that is concurrently run in a ``parent`` :py:class:`~.Scope`. This allows to store or pass on the :py:class:`Task` in order to control the underlying activity. Other activities can ``await`` a :py:class:`Task` to receive results or exceptions on completion, similar to a regular activity. .. code:: python3 async def my_activity(delay): await (time + delay) return delay await my_activity() # await an activity async with Scope() as scope: task = scope.do(my_activity()) await task # await Task of an activity In contrast to a bare activity, it is possible to * :py:meth:`~.Task.cancel` a :py:class:`Task` before completion, * ``await`` the result of a :py:class:`Task` multiple times, and * ``await`` that a :py:class:`Task` is :py:attr:`~.Task.done`. :note: This class should not be instantiated directly. Always use a :py:class:`~.Scope` to create it. """ __slots__ = 'payload', '_result', '__runner__', '_cancellations', '_done',\ '__volatile__', 'parent' def __init__( self, payload: Coroutine[Any, Any, RT], parent: 'Scope', delay: Optional[float], at: Optional[float], volatile: bool, ): @wraps(payload) async def payload_wrapper(): # check for a pre-run cancellation if self._result is not None: try_close(self.payload) self.parent.__child_finished__(self, failed=False) return try: # We suspend the Task internally instead of waiting to start # the Task externally. This is because starting must *always* # be done via ``Task.__runner__.send(None)`` which we *cannot* # cancel cleanly. An internal suspension means we *can* cancel # the Task pre-run because no time passes until we check that. if delay or at: await suspend(delay=delay, until=at) result = await self.payload except CancelTask as err: assert ( err.subject is self ), "task for activity %r received cancellation of %r" % ( self, err.subject ) self._result = None, err.__transcript__ self.parent.__child_finished__(self, failed=False) except GeneratorExit: # We are NOT allowed to do any async once the generator # exits forcefully. # We should only receive GeneratorExit due to a forceful # termination in self.__close__ or during cleanup. self.parent.__child_finished__(self, failed=False) except BaseException as err: self._result = None, err self.parent.__child_finished__(self, failed=True) else: self._result = result, None self.parent.__child_finished__(self, failed=False) for cancellation in self._cancellations: cancellation.revoke() try_close(self.payload) self._done.__set_done__() self.__volatile__ = volatile self._cancellations = [] # type: List[CancelTask] self._result = None \ # type: Optional[Tuple[Optional[RT], Optional[BaseException]]] self.payload = payload self.parent = parent self._done = Done(self) self.__runner__ = payload_wrapper() # type: Coroutine[Any, Any, RT] def __await__(self): yield from self._done.__await__() result, error = self._result if error is not None: raise error else: return result # noqa: B901 @property def __exception__(self) -> Optional[BaseException]: """Get the exception of this task""" assert self._result is not None,\ 'Task.__exception__ may only be queried for finished tasks' return self._result[1] @property def done(self) -> 'Done': """ :py:class:`~.Condition` whether the :py:class:`~.Task` has stopped running. This includes completion, cancellation and failure. """ return self._done @property def status(self) -> TaskState: """The current status of this activity""" if self._result is not None: result, error = self._result if error is not None: return ( TaskState.CANCELLED if isinstance(error, (TaskCancelled, TaskClosed)) else TaskState.FAILED ) return TaskState.SUCCESS # a stripped-down version of `inspect.getcoroutinestate` if self.__runner__.cr_frame.f_lasti == -1: return TaskState.CREATED return TaskState.RUNNING def __close__(self, reason=TaskClosed('activity closed')): """ Close the underlying coroutine This is similar to calling :py:meth:`Coroutine.close`, but ensures that waiting activities are properly notified. """ # we have not FINISHED running yet, and can still change the result if self._result is None: self._result = None, reason if self.__runner__.cr_frame.f_lasti == -1: # We have not STARTED running yet # This means __runner__ will start running in the same time frame. # We cannot .close() it, since it must receive the un-cancellable # initial .send(None). # We prepare the state *as if* we had stopped; the __runner__ # will then shutdown at a later turn without observable side-effects. self._done.__set_done__() else: # We are RUNNING and __runner__ is prepared to catch GeneratorExit # Close the __runner__ to have it clean up and finalize everything. self.__runner__.close()
[docs] def cancel(self, *token) -> None: """ Cancel this task during the current time step If the :py:class:`~.Task` is running, a :py:class:`~.CancelTask` is raised once the activity suspends. The activity may catch and react to :py:class:`~.CancelActivity`, but should not suppress it. If the :py:class:`~.Task` is :py:attr:`~.Task.done` before :py:class:`~.CancelTask` is raised, the cancellation is ignored. This also means that cancelling an activity multiple times is allowed, but only the first successful cancellation is stored as the cancellation cause. If the :py:class:`~.Task` has not started running, it is cancelled immediately. This prevents any code execution, even before the first suspension. :warning: The timing of cancelling a Task before it started running may change in the future. """ if self._result is None: if self.status is TaskState.CREATED: self._result = None, TaskCancelled(self, *token) self._done.__set_done__() else: cancellation = CancelTask(self, *token) self._cancellations.append(cancellation) cancellation.scheduled = True __USIM_STATE__.loop.schedule(self.__runner__, signal=cancellation)
@reprlib.recursive_repr() def __repr__(self): child_status = 'active' if self._result is None else ( f'result={self._result[0]!r}' if self._result[1] is None else f'signal={self._result[1]!r}' ) return ( f'<{self.__class__.__name__} object payload={self.payload}[{child_status}] ' f'parent={self.parent}>' ) def __del__(self): # Since a Task is only meant for use in a controlled # fashion, going out of scope unexpectedly means there is # a bug/error somewhere. This should be accompanied by an # error message or traceback. # In order not to detract with auxiliary, useless resource # warnings, we clean up silently to hide our abstraction. self.__runner__.close()
class Done(Condition): """Whether a :py:class:`Task` has stopped running""" __slots__ = ('_task', '_value', '_inverse') def __init__(self, task: Task): super().__init__() self._task = task self._value = False self._inverse = NotDone(self) def __bool__(self): return self._value def __invert__(self): return self._inverse def __set_done__(self): """Set the boolean value of this condition""" assert not self._value self._value = True self.__trigger__() def __repr__(self): return f'<{self.__class__.__name__} for {self._task!r}>' class NotDone(Condition): """Whether a :py:class:`Task` has not stopped running""" __slots__ = ('_done',) def __init__(self, done: Done): super().__init__() self._done = done def __bool__(self): return not self._done def __invert__(self): return self._done def __repr__(self): return f'<{self.__class__.__name__} for {self._done._task!r}>'