Source code for usim._basics.pipe

from typing import Optional, Dict

from .._primitives.notification import Notification, suspend, postpone
from .._primitives.timing import time


[docs]class Pipe: """ Shared transport for resources with a limited total throughput :param throughput: limit of total throughput The throughput limit of a pipe is defined when a :py:class:`Pipe` is created. Afterwards, :term:`activities <activity>` may temporarily request transfers through the :py:class:`Pipe` with a maximum throughput. If the sum of throughput from all transfers exceeds the throughput limit, transfers are throttled accordingly. .. code:: python3 connection = Pipe(throughput=3) # transfers below limit await connection.transfer(total=10, throughput=2) # takes 5 time units # transfers above limit async with Scope() as scope: # takes 10 time units scope.do(connection.transfer(15, throughput=3) scope.do(connection.transfer(15, throughput=3) """ def __init__(self, throughput: float): assert throughput > 0, 'throughput must be positive' self.throughput = throughput self._congested = Notification() self._throughput_scale = 1.0 self._subscriptions: Dict[object, float] = {}
[docs] async def transfer( self, total: float, throughput: Optional[float] = None ) -> None: """ Wait until some total volume has been transferred :param total: absolute volume to transfer before resuming :param throughput: maximum throughput of transfer The effective ``throughput`` is bounded by the transfer's ``throughput`` as well as the Pipe's :py:attr:`~.throughput` weighted by all transfers' ``throughput``. For example, if two transfers each request the entire :py:attr:`~.throughput`, each receives only half. .. code:: python3 network = Pipe(throughput=64) await network.transfer(total=50 * 1024, throughput=128) # transfer with 64 If ``throughput`` is not given, it defaults to the Pipe's :py:attr:`~.throughput` limit. """ assert total >= 0, 'total must be positive' assert throughput is None or throughput > 0,\ 'throughput must be positive or None' transferred = 0 identifier = object() throughput = throughput if throughput is not None else self.throughput self._add_subscriber(identifier, throughput) while transferred < total: window_start = time.now window_throughput = throughput * self._throughput_scale # Try to delay until we have transferred everything. # Be prepared to get interrupted if throughput changes. with self._congested.__subscription__(): delay = (total - transferred) / window_throughput if delay > 0: await suspend(delay=delay, until=None) else: await postpone() # At this point, we have been suspended for as long as calculated. # Barring float *imprecision* we have transferred the desired volume. transferred = total window_end = time.now transferred += (window_end - window_start) * window_throughput self._del_subscriber(identifier)
def _add_subscriber(self, identifier, throughput): self._subscriptions[identifier] = throughput self._throttle_subscribers() def _del_subscriber(self, identifier): del self._subscriptions[identifier] self._throttle_subscribers() def _throttle_subscribers(self): desired_throughput = sum(self._subscriptions.values()) if desired_throughput > self.throughput: self._throughput_scale = self.throughput / desired_throughput self._congested.__awake_all__() elif self._throughput_scale != 1.0: self._throughput_scale = 1.0 self._congested.__awake_all__()
[docs]class UnboundedPipe(Pipe): """ Shared transport for resources with unlimited total throughput This is a noop variant of the regular :py:class:`~usim.Pipe`. It serves as a neutral element when a :py:class:`~usim.Pipe` is required but no throttling should take place. """ def __init__(self, throughput=float('inf')): assert throughput == float('inf'),\ 'throughput must be infinite; use Pipe for finite throughput' super().__init__(throughput=throughput)
[docs] async def transfer( self, total: float, throughput: Optional[float] = None ) -> None: # Ensure that the outwards appearance is the same as the base: # * fail on the same inputs # * allow other tasks to run assert total >= 0, 'total must be positive' assert throughput is None or throughput > 0,\ 'throughput must be positive or None' if throughput is None or throughput == float('inf'): await postpone() else: delay = total / throughput if delay > 0: await suspend(delay=delay, until=None) else: await postpone()