from typing import TypeVar, Generic, Optional, Type, Union, Dict
from .._core.handler import __USIM_STATE__
from ._resource_level import __specialise__, ResourceLevels
from .tracked import Tracked
T = TypeVar('T')
[docs]class ResourcesUnavailable(Exception):
"""Resources requested from a supply are not available"""
__slots__ = 'claim',
def __init__(self, claim: 'ClaimedResources'):
self.claim = claim
class BaseResources(Generic[T]):
"""
Internal base class for resource types
"""
_levels_type: Type[ResourceLevels[T]]
_available: Tracked[ResourceLevels[T]]
#: ResourceLevels representing no available resources
_zero: ResourceLevels[T]
@property
def levels(self) -> ResourceLevels[T]:
"""Current levels of resources"""
return self._available.value
@property
def resource_type(self) -> Type[ResourceLevels[T]]:
"""Type of underlying resources"""
return self._levels_type
async def __insert_resources__(self, amounts: ResourceLevels):
new_levels = self._available.value + amounts
await self._available.set(new_levels)
async def __remove_resources__(self, amounts: ResourceLevels):
new_levels = self._available.value - amounts
await self._available.set(new_levels)
def borrow(self, **amounts: T) -> 'BorrowedResources[T]':
"""
Temporarily borrow resources for a given context
:param amounts: resource levels to borrow
:return: async context to borrow resources
"""
borrowed_levels = self._levels_type(**amounts)
assert self._zero <= borrowed_levels,\
'cannot borrow negative amounts'
return BorrowedResources(self, borrowed_levels)
def claim(self, **amounts: T) -> 'ClaimedResources[T]':
"""
Temporarily borrow resources for a given context if available
:param amounts: resource levels to borrow
:return: async context to borrow resources
:raises ResourcesUnavailable: if the claim is made as resources are unavailable
"""
borrowed_levels = self.borrow(**amounts).limits
return ClaimedResources(self, borrowed_levels)
def __repr__(self):
content = ', '.join(
f'{key}={item}' for key, item in self._available.value
)
return f'{self.__class__.__name__}({content})'
def __eq__(self, other: Union[ResourceLevels[T], Dict[str, T]]):
if isinstance(other, dict):
other = self.resource_type(**other)
return self._available == other
def __ne__(self, other: Union[ResourceLevels[T], Dict[str, T]]):
if isinstance(other, dict):
other = self.resource_type(**other)
return self._available != other
def __gt__(self, other: Union[ResourceLevels[T], Dict[str, T]]):
if isinstance(other, dict):
other = self.resource_type(**other)
return self._available > other
def __ge__(self, other: Union[ResourceLevels[T], Dict[str, T]]):
if isinstance(other, dict):
other = self.resource_type(**other)
return self._available >= other
def __le__(self, other: Union[ResourceLevels[T], Dict[str, T]]):
if isinstance(other, dict):
other = self.resource_type(**other)
return self._available <= other
def __lt__(self, other: Union[ResourceLevels[T], Dict[str, T]]):
if isinstance(other, dict):
other = self.resource_type(**other)
return self._available < other
[docs]class BorrowedResources(BaseResources[T]):
"""
Fixed supply of named resources temporarily taken from another resource supply
"""
@property
def _levels_type(self):
return self._resources._levels_type
@property
def limits(self):
"""Upper limit of resource levels"""
return self._debits
def __init__(self, resources: 'BaseResources', debits: ResourceLevels):
self._resources = resources
self._zero = self._levels_type()
self._debits = debits
self._available = Tracked(self._levels_type())
async def __aenter__(self):
# do not postpone if we can resume immediately
if not self._resources._available >= self._debits:
await (self._resources._available >= self._debits)
await self._resources.__remove_resources__(self._debits)
await self.__insert_resources__(self._debits)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if exc_type is GeneratorExit:
# we are killed forcefully and cannot perform async operations
# dispatch a new activity to release our resources eventually
__USIM_STATE__.loop.schedule(
self.__remove_resources__(self._debits)
)
__USIM_STATE__.loop.schedule(
self._resources.__insert_resources__(self._debits)
)
else:
await self.__remove_resources__(self._debits)
await self._resources.__insert_resources__(self._debits)
# TODO: forcefully kill off anyone holding our resources?
[docs] def borrow(self, **amounts: T) -> 'BorrowedResources[T]':
borrowing = super().borrow(**amounts)
assert self._debits >= borrowing._debits,\
'cannot borrow beyond capacity'
return borrowing
[docs]class ClaimedResources(BorrowedResources[T]):
"""
Fixed supply of resources temporarily taken without delay
"""
async def __aenter__(self):
# do not postpone if we can resume immediately
if not self._resources._available >= self._debits:
raise ResourcesUnavailable(self)
return await super().__aenter__()
[docs]class Capacities(BorrowedResources[T]):
r"""
Fixed supply of named resources which can be temporarily borrowed
The resources and their maximum capacity are defined
when the resource supply is created.
Afterwards, it is only possible to temporarily :py:meth:`borrow`
resources:
.. code:: python3
# create a limited supply of resources
resources = Capacities(cores=8, memory=16000)
# temporarily remove resources
async with resources.borrow(cores=2, money=4000):
await computation
A :py:class:`~.Capacities` guarantees that its resources are conserved and
cannot be leaked. Once resources are :py:meth:`~.borrow`\ ed, they can
always be returned promptly.
"""
def __init__(self, __zero__: Optional[T] = None, **capacity: T):
resources = Resources(__zero__, **capacity)
super().__init__(resources, resources.levels)
self._available = Tracked(resources.levels)
[docs]class Resources(BaseResources[T]):
r"""
Supply of named resources which can be temporarily borrowed or produced/consumed
The resources and their initial levels are defined
when the resource supply is created.
Afterwards, the level of resources can be permanently :py:meth:`~.increase`\ d or
:py:meth:`~.decrease`\ d as well as temporarily decreased by :py:meth:`borrow`\ ing:
.. code:: python3
# create an open supply of resources
resources = Resources(cores=8, memory=4000)
# increase the resource supply available
await resources.increase(memory=2000)
# temporarily remove resources
async with resources.borrow(cores=2, memory=6000):
await computation
# decrease the resource supply available
await resources.decrease(cores=4)
A :py:class:`~.Capacities` guarantees that it is always possible
to increase the level of available resources.
Once resources are :py:meth:`~.borrow`\ ed, they can
always be returned promptly.
"""
def __init__(self, __zero__: Optional[T] = None, **capacity: T):
if not capacity: # Note: this should be a type-error not assert for consistency
raise TypeError(
'%s requires at least 1 keyword-only argument' % self.__class__.__name__
)
__zero__ = __zero__ if __zero__ is not None else\
type(next(iter(capacity.values())))() # bare type invocation must be zero
self._levels_type = __specialise__(__zero__, capacity.keys())
self._zero = self._levels_type()
self._available = Tracked(self._levels_type(**capacity))
assert self._available >= self._zero,\
'initial capacities must be greater than or equal to zero'
[docs] async def set(self, **amounts: T):
"""
Set the level of resources
:param amounts: resource levels to set
Only levels of resources that are already part of these
:py:class:`~.Resources` can be set. Levels cannot be set
below zero. If a resource is not specified, its level remains
unchanged.
"""
assert self._zero <= self._levels_type(**amounts),\
'cannot increase by negative amounts'
new_levels = dict(self._available.value).copy()
new_levels.update(amounts)
await self._available.set(self._levels_type(**new_levels))
[docs] async def increase(self, **amounts: T):
"""
Increase the level of resources
:param amounts: resource levels to increase
Only levels of resources that are already part of these
:py:class:`~.Resources` can be increased. Levels cannot be increased
by negative amounts. If a resource is not specified, its level remains
unchanged.
"""
delta = self._levels_type(**amounts)
assert self._zero <= delta,\
'cannot increase by negative amounts'
await self.__insert_resources__(delta)
[docs] async def decrease(self, **amounts: T):
"""
Decrease the level of resources
:param amounts: resource levels to decrease
Only levels of resources that are already part of these
:py:class:`~.Resources` can be decreased. Levels cannot be decreased
by negative amounts or below zero. If a resource is not specified, its
level remains unchanged.
"""
delta = self._levels_type(**amounts)
assert self._zero <= delta,\
'cannot decrease by negative amounts'
assert self._zero <= (self._available.value - delta),\
'cannot decrease below zero'
await self.__remove_resources__(delta)