Source code for usim.py.resources.base

from typing import Sequence, ClassVar, Type, TypeVar, Generic
from itertools import takewhile

from ..core import Environment
from ..events import Event

__all__ = ['Put', 'Get', 'BaseResource']


# Implementation Note
# We take some liberty in interpreting SimPy's API. Several parts
# are loosely specified - e.g. BaseResource.GetQueue has to satisfy
# different specs for ``BaseResource`` and ``Request``. We take the
# most general if it means the code gets cleaner.


T = TypeVar('T')


[docs]class BaseRequest(Event[T]): r""" Base class for :py:class:`~.Put` and :py:class:`~.Get` events A request is commonly created via :py:meth:`~.BaseResource.put` or :py:meth:`~.BaseResource.get` and must be ``yield``\ ed by a Process to wait until the request has been served. If a request has not been served but is no longer desired, the process should :py:class:`~.cancel` the request. A request can be used as a context manager to automatically cancel it at the end of the context. Note that the request is not automatically waited on -- this must be done explicitly if desired. .. code:: python3 with resource.get() as request: yield request """ def __init__(self, resource: 'BaseResource'): super().__init__(resource._env) self.resource = resource #: the process that requested the action self.proc = self.env.active_process def __enter__(self): return self def __exit__(self, exc_type, exc_value, traceback): self.cancel()
[docs] def cancel(self): """ Cancel the request Requests should be cancelled when they have not been triggered but are no longer needed. This may happen when a process is interrupted while waiting for a request. Cancelling is idempotent: If the request has already been triggered or canceled, :py:meth:`~.cancel` can still be called without error. """ raise NotImplementedError
[docs]class Put(BaseRequest[T]): """Request to put content into a resource""" def __init__(self, resource: 'BaseResource'): super().__init__(resource) # Enqueue the event to be processed... resource.put_queue.append(self) # ...schedule our inverse when we trigger (put item -> get item)... self.callbacks.append(resource._trigger_get) # ...and immediately check whether we could trigger resource._trigger_put(None)
[docs] def cancel(self): if not self.triggered: self.resource.put_queue.remove(self)
[docs]class Get(BaseRequest[T]): """Request to get content out of a resource""" def __init__(self, resource: 'BaseResource'): super().__init__(resource) # Enqueue the event to be processed... resource.get_queue.append(self) # ...schedule our inverse when we trigger (put item -> get item)... self.callbacks.append(resource._trigger_put) # ...and immediately check whether we could trigger resource._trigger_get(None)
[docs] def cancel(self): if not self.triggered: self.resource.get_queue.remove(self)
[docs]class BaseResource(Generic[T]): """ Base class for all synchronised resources of :py:mod:`usim.py` This type codifies the basic semantics of :py:mod:`usim.py` resources: processes can :py:meth:`~.put` content into the resource or :py:meth:`~.get` content out of the resource. Both actions return an :py:class:`~usim.py.events.Event`; once the event triggers, the process did successfully get or put content into or out of the resource. Processes should ``yield`` this even to wait for success of their request. .. code:: python3 def foo(env, resources: BaseResource): print('Getting a resource at', env.time) yield resources.get() print('Got resource at', env.time) yield env.timeout(2) print('Returning a resource at', env.time) yield resources.put() print('Returned a resource at', env.time) Subclasses should define their behaviour by implementing at least one of: :py:attr:`~.BaseResource.PutQueue` or :py:attr:`~.BaseResource.GetQueue` The types used to create the :py:attr:`~.put_queue` and :py:attr:`~.get_queue`. These may, for example, customize priority of queued requests. :py:meth:`~.put` or :py:meth:`~.get` The methods used to create :py:class:`~.Put` and :py:class:`~.Get` events. These may, for example, customize how much of a resource to handle at once. :py:meth:`~._do_put` or :py:meth:`~._do_get` The methods used to process :py:class:`~.Put` and :py:class:`~.Get` events. These are an alternative to customizing :py:meth:`~.put` or :py:meth:`~.get`. .. note:: This is *not* an Abstract Base Class. Subclasses do not need to implement :py:meth:`~.BaseResource._do_get` or :py:meth:`~.BaseResource._do_put`. .. hint:: **Migrating to μSim** There is no common base type for resources in μSim -- instead there are several different types of resources made for various use-cases. If you need to create a custom resource, you are free to choose whatever interface is appropriate for your use-case. μSim itself usually uses the ``await``, ``async for`` and ``async with`` depending on the intended use of a resource. Commonly this means ``await resource`` to get content from a resource, ``await resource.put(...)`` to add content to a resource, ``async for item in resource:`` to subscribe to a resource, and ``async with resource:`` to temporarily use a resource. """ #: The type used to create :py:attr:`~.put_queue` PutQueue: ClassVar[Type[Sequence]] = list #: The type used to create :py:attr:`~.get_queue` GetQueue: ClassVar[Type[Sequence]] = list def __init__(self, env: Environment, capacity): self._env = env self._capacity = capacity #: pending put events self.put_queue = self.PutQueue() #: pending get events self.get_queue = self.GetQueue() @property def capacity(self): """Maximum capacity of the resource""" return self._capacity
[docs] def put(self) -> Put[T]: """Create a request to put content into the resource""" return Put(self)
[docs] def get(self) -> Get[T]: """Create a request to get content out of the resource""" return Get(self)
def _trigger_put(self, get_event: Get): """ Trigger all possible put events Called when a ``Put`` event was created or a ``Get`` event triggered. This trigger scheme handles a new request to an uncontested resource (the ``Put`` succeeds immediately), as well as a full one (the ``Get`` may serve a waiting ``Put``). :param get_event: ``Get`` event that was triggered or :py:const:`None` """ triggered = list(takewhile(self._do_put, self.put_queue)) self.put_queue = self.put_queue[len(triggered):] def _trigger_get(self, put_event: Put): """ Trigger all possible put events Called when a ``Get`` event was created or a ``Put`` event triggered. This trigger scheme handles a new request to an uncontested resource (the ``Get`` succeeds immediately), as well as a full one (the ``Put`` may serve a waiting ``Get``). :param put_event: ``Get`` event that was triggered or :py:const:`None` """ triggered = list(takewhile(self._do_get, self.get_queue)) self.get_queue = self.get_queue[len(triggered):] # NOTE: Per the SimPy spec, these are **PUBLIC**
[docs] def _do_get(self, get_event: Get) -> bool: """ Trigger a :py:class:`~.Get` event if possible :param get_event: the event that may be triggered :return: whether another event may be triggered """ raise NotImplementedError("'_do_get' must be overridden in subclasses")
[docs] def _do_put(self, get_event: Put) -> bool: """ Trigger a :py:class:`~.Put` event if possible :param get_event: the event that may be triggered :return: whether another event may be triggered """ raise NotImplementedError("'_do_put' must be overridden in subclasses")