from typing import List, Tuple, Coroutine, Optional
from contextlib import contextmanager
from .._core.loop import Interrupt, __HIBERNATE__
from .._core.handler import __USIM_STATE__
# TODO: add protocol for destroying a notification
class NoSubscribers(Exception):
...
async def postpone():
"""
Postpone a coroutine in the current time step
This will safely requeue the current task,
allowing other tasks to run and interrupts to occur.
"""
loop = __USIM_STATE__.loop
task = loop.activity
wake_up = Interrupt('postpone', task)
loop.schedule(task, signal=wake_up)
try:
await __HIBERNATE__
except Interrupt as err:
if err is not wake_up:
assert (
task is loop.activity
), 'Break points cannot be passed to other coroutines'
raise
finally:
wake_up.revoke()
async def suspend(*, delay: Optional[float], until: Optional[float]):
"""
Suspend a coroutine until a future time step
This will safely requeue the current task,
allowing other tasks to run and interrupts to occur.
Time will pass as if ``time == until`` or ``time + delay``
were used, but there is no ``Condition`` interface on top.
"""
loop = __USIM_STATE__.loop
task = loop.activity
wake_up = Interrupt('postpone', task)
loop.schedule(task, signal=wake_up, delay=delay, at=until)
try:
await __HIBERNATE__
except Interrupt as err:
if err is not wake_up:
assert (
task is loop.activity
), 'Break points cannot be passed to other coroutines'
raise
finally:
wake_up.revoke()
[docs]class Notification:
"""
Synchronisation point to which activities can subscribe
.. code:: python
await notification # hibernate until notified
async with until(notification):
...
"""
__slots__ = ('_waiting',)
def __init__(self):
self._waiting = [] # type: List[Tuple[Coroutine, Interrupt]]
def __await__(self):
with self.__subscription__():
yield from __HIBERNATE__
def __awake_next__(self) -> Tuple[Coroutine, Interrupt]:
"""Awake the oldest waiter"""
try:
waiter, interrupt = self._waiting.pop(0)
except IndexError:
raise NoSubscribers
else:
__USIM_STATE__.loop.schedule(waiter, signal=interrupt)
return waiter, interrupt
def __awake_all__(self) -> List[Tuple[Coroutine, Interrupt]]:
"""Awake all waiters"""
awoken = self._waiting.copy()
self._waiting.clear()
for waiter, interrupt in awoken:
__USIM_STATE__.loop.schedule(waiter, signal=interrupt)
return awoken
# Subscribe/Unsubscribe
def __subscribe__(self, waiter: Coroutine, interrupt: Interrupt):
"""Subscribe a task to this notification"""
self._waiting.append((waiter, interrupt))
def __unsubscribe__(self, waiter: Coroutine, interrupt: Interrupt):
"""Unsubscribe a subscribed task"""
if interrupt.scheduled:
interrupt.revoke()
else:
self._waiting.remove((waiter, interrupt))
@contextmanager
def __subscription__(self):
loop = __USIM_STATE__.loop
task = loop.activity
wake_up = Interrupt(self, task)
self.__subscribe__(task, wake_up)
try:
yield
except Interrupt as err:
if err is not wake_up:
assert (
task is loop.activity
), 'Break points cannot be passed to other coroutines'
raise
finally:
self.__unsubscribe__(task, wake_up)
if __debug__:
def __del__(self):
if self._waiting:
raise RuntimeError(
'%r collected without releasing %d waiting tasks:\n %s' % (
self, len(self._waiting), self._waiting
)
)
def __repr__(self):
return '<%s, waiters=%d>' % (self.__class__.__name__, len(self._waiting))