Source code for usim._basics.streams

r"""
Synchronizing streams that allow consumers to wait for messages from producers

Streams synchronize **adding** and **retrieving** messages
across multiple producers and consumers.
This allows consumers to wait for messages,
and producers to delay messages as required.

Consumers can retrieve individual messages
or iterate over all messages:

.. code:: python

    value = await stream  # consume a single message from the stream

    async for value in stream:  # consume messages until the stream is closed
        ...

Similarly, producers can send individual or several messages at once:

.. code:: python

    await stream.put(value)
    await stream.send(values)

Note that channels exist *exclusively* for message passing,
and do not serve as :py:class:`Notification`\ s.
A channel cannot be used in an ``async with until(...):`` statement.
"""
from collections import deque

from typing import Generic, TypeVar, Dict, List, Deque,\
    Union, AsyncIterable, Generator, Any

from .._primitives.notification import postpone, Notification, NoSubscribers
from .._primitives.locks import Lock


#: Type of channel content
ST = TypeVar('ST')


[docs]class StreamClosed(Exception): def __init__(self, stream): self.stream = stream super().__init__('%r is closed and cannot provide more messages' % stream)
[docs]class Channel(AsyncIterable, Generic[ST]): """ Unbuffered stream that broadcasts every message to all consumers """ @property def closed(self): return self._closed def __init__(self): super().__init__() self._consumer_buffers = {} \ # type: Dict[Any, Union[List[ST], Deque[ST]]] self._notification = Notification() self._closed = False
[docs] async def close(self): """ Prevent putting further messages into the :py:class:`~.Channel` Closing a :py:class:`~.Channel` causes subsequent attempts to :py:meth:`~.Channel.put` or retrieve items to fail with :py:exc:`~.StreamClosed`. A :py:class:`~.Channel` can be closed multiple times; subsequent closes have no effects other than :term:`postponement`. """ if not self._closed: self._closed = True self._notification.__awake_all__() await postpone()
def __await__(self) -> Generator[Any, None, ST]: if self._closed: raise StreamClosed(self) sentinel = object() self._consumer_buffers[sentinel] = buffer = [] # type: List[ST] try: yield from self._notification.__await__() finally: del self._consumer_buffers[sentinel] if not buffer and self._closed: raise StreamClosed(self) return buffer[0] # noqa: B901 async def __aiter__(self): sentinel = object() self._consumer_buffers[sentinel] = buffer = deque() # type: Deque[ST] try: while True: while buffer: yield buffer.popleft() if self._closed: break await self._notification finally: del self._consumer_buffers[sentinel]
[docs] async def put(self, item: ST): r""" Put an item into the :py:class:`~.Channel` :param item: the item to broadcast :raises StreamClosed: if the stream has been :py:meth:`~.close`\ d """ if self._closed: raise StreamClosed(self) for buffer in self._consumer_buffers.values(): buffer.append(item) self._notification.__awake_all__() await postpone()
def __repr__(self): return f'<{self.__class__.__name__}, '\ f'consumers={len(self._consumer_buffers)}, closed={self._closed}>'
[docs]class Queue(AsyncIterable, Generic[ST]): """ Buffered stream that anycasts messages to individual consumers """ @property def closed(self): return self._closed def __init__(self): super().__init__() self._buffer = deque() # type: deque[ST] self._notification = Notification() # mutex to ensure readers are ordered self._read_mutex = Lock() self._closed = False
[docs] async def close(self): """ Prevent putting further messages into the :py:class:`~.Queue` Closing a :py:class:`~.Queue` causes subsequent attempts to :py:meth:`~.Queue.put` items to fail with :py:exc:`~.StreamClosed`. When there are no items in a closed :py:class:`~.Queue`, attempts to retrieve items fail with :py:exc:`~.StreamClosed`. Items already buffered may still be received. A :py:class:`~.Queue` can be closed multiple times; subsequent closes have no effects other than :term:`postponement`. """ if not self._closed: self._closed = True self._notification.__awake_all__() await postpone()
def __await__(self) -> Generator[Any, None, ST]: return (yield from self._await_message().__await__()) # noqa: B901 async def _await_message(self): async with self._read_mutex: if self._buffer: await postpone() return self._buffer.popleft() elif self._closed: raise StreamClosed(self) await self._notification try: return self._buffer.popleft() except IndexError: assert self._closed # on failure, report this as a usim bug raise StreamClosed(self) async def __aiter__(self): while True: try: result = await self except StreamClosed: break else: yield result
[docs] async def put(self, item: ST): r""" Put an item into the :py:class:`~.Queue` :param item: the item to enqueue :raises StreamClosed: if the stream has been :py:meth:`~.close`\ d """ if self._closed: raise StreamClosed(self) self._buffer.append(item) try: self._notification.__awake_next__() except NoSubscribers: pass await postpone()
def __repr__(self): return f'<{self.__class__.__name__}, '\ f'buffer=[{", ".join(map(repr, self._buffer))}]>'
Stream = Union[Channel, Queue]