from typing import Optional, Tuple, Union, Type, Dict, Any, ClassVar
from weakref import WeakValueDictionary
class MetaConcurrent(type):
"""
Metaclass to implement specialisation of :py:exc:`Concurrent`
Provides specialisation via subscription and corresponding type checks:
``Type[spec]`` and ``issubclass(Type[spec], Type[spec, spec2])``. Accepts
the specialisation ``...`` (:py:const:`Ellipsis`) to mark the specialisation
as inclusive, meaning a subtype may have additional specialisations.
"""
# metaclass instance fields - i.e. class fields
# used to define specialisations of a base type
inclusive: bool
specialisations: 'Optional[Tuple[Type[Union[Concurrent, Exception]], ...]]'
template: 'MetaConcurrent'
__specialisations__: WeakValueDictionary
# Called when constructing a class (an instance of the metaclass)
# The following corresponds to calling __new__:
#
# class name(*bases, specialisations=specialisations, inclusive=inclusive):
# <namespace>
#
# __new__ is called once per type hierarchy when the template
# is defined using `class ...`.
# Afterwards, __getitem__ (i.e. Class[...]) explicitly calls
# __new__ to create specialisations.
def __new__(
mcs,
name: str,
bases: Tuple[Type, ...],
namespace: Dict[str, Any],
specialisations:
'Optional[Tuple[Type[Union[Concurrent, Exception]], ...]]' = None,
inclusive: bool = True,
**kwargs,
):
cls = super().__new__(
mcs, name, bases, namespace, **kwargs
) # type: MetaConcurrent
if specialisations is not None:
template = bases[0]
else:
inclusive = True
template = cls
cls.inclusive = inclusive
cls.specialisations = specialisations
cls.template = template
return cls
# Implementation Note:
# The Python data model defines both
# * ``isinstance(a, b) => type(b).__instancecheck__(b, a)``
# * ``issubclass(a, b) => type(b).__subclasscheck__(b, a)``
# So we could need either for error handling.
#
# The Python language translates the except clause of
# try: raise a
# except b as err: <block>
# to ``if issubclass(type(a), b): <block>``.
#
# Which means we need just ``__subclasscheck__`` for error handling.
# We implement ``__instancecheck__`` for consistency only.
def __instancecheck__(cls, instance):
"""``isinstance(instance, cls)``"""
return cls.__subclasscheck__(type(instance))
def __subclasscheck__(cls, subclass):
"""``issubclass(subclass, cls)``"""
# issubclass(A, A)
if cls is subclass:
return True
try:
template = subclass.template
except AttributeError:
return False
else:
# if we are templated, check that the specialisation matches
# the superclass specialisation must be at least
# as general as the subclass specialisation
if template == cls.template:
# except MultiError:
# issubclass(A[???], A)
# the base class is the superclass of all its specialisations
if cls.specialisations is None:
return True
# except MultiError[]:
# issubclass(A[???], A[???])
else:
return cls._subclasscheck_specialisation(subclass)
return False
def _subclasscheck_specialisation(cls, subclass: 'MetaConcurrent'):
"""``issubclass(:Type[subclass.specialisation], Type[:cls.specialisation])``"""
# specialisations are covariant - if A <: B, then Class[A] <: Class[B]
#
# This means that we must handle cases where specialisations
# match multiple times - for example, when matching
# Class[B] against Class[A, B], then B matches both A and B,
#
# Make sure that ``cls`` has no unmatched specialisations
matched_specialisations = all(
any(
issubclass(child, specialisation)
for child in subclass.specialisations
) for specialisation in cls.specialisations
)
if not matched_specialisations:
return False
# except MultiError[KeyError, ...]
elif cls.inclusive:
# We do not care if ``subclass`` has unmatched specialisations
return True
# except MultiError[KeyError]:
else:
# Make sure that ``subclass`` has no unmatched specialisations
#
# We need to check every child of subclass instead of comparing counts.
# This is needed in case that we have duplicate matches. Consider:
# Concurrent[KeyError, LookupError], Concurrent[KeyError, RuntimeError]
return not any(
not issubclass(child, cls.specialisations)
for child in subclass.specialisations
)
# Specialisation Interface
# Allows to do ``Cls[A, B, C]`` to specialise ``Cls`` with ``A, B, C``.
# This part is the only one that actually understands ``...``.
#
# Expect this to be called by user-facing code, either directly or as a result
# of ``Cls(A(), B(), C())``. Errors should be reported appropriately.
#
# Unlike () calls, [] calls only take a single argument.
# Multiple arguments get passed as a tuple:
# - Cls[a] means Cls.__getitem__(a)
# - Cls[a, b] means Cls.__getitem__((a, b))
def __getitem__(
cls,
item: # [Exception] or [...] or [Exception, ...]
Union[
Type[Exception],
'Type[Concurrent]',
'ellipsis', # noqa
'Tuple[Union[Type[Concurrent], Type[Exception], "ellipsis"], ...]', # noqa
]
):
"""``cls[item]`` - used to specialise ``cls`` with ``item``"""
# validate/normalize parameters
#
# Cls[A, B][C]
if cls.specialisations is not None:
raise TypeError(f'Cannot specialise already specialised {cls.__name__!r}')
# Cls[...]
if item is ...:
return cls
# Cls[item]
elif type(item) is not tuple:
assert issubclass(item, (Exception, cls)), (
f'{cls.__name__!r} may only be specialised by Exception subclasses, '
f'not {item}'
)
item = (item,)
# Cls[item1, item2]
else:
assert all(
(child is ...) or issubclass(child, (Exception, cls)) for child in item
), (
f'{cls.__name__!r} may only be specialised by Exception subclasses, '
f'not {item}'
)
return cls._get_specialisation(item)
def _get_specialisation(cls, item):
# provide specialised class
#
# If a type already exists for the given specialisation, we return that
# same type. This avoids class creation and allows fast `A is B` checks.
#
# Each template stores the currently used __specialisations__, indexed
# by a set of items - this eliminates duplicates and ordering as well.
unique_spec = frozenset(item)
try:
specialised_cls = cls.__specialisations__[unique_spec]
except KeyError:
inclusive = ... in unique_spec
specialisations = tuple(child for child in unique_spec if child is not ...)
# the specialisation string "KeyError, IndexError, ..."
spec = ", ".join(
child.__name__ for child in specialisations
) + (', ...' if inclusive else '')
# the specialised subclass, as in
#
# class 'cls.__name__[spec]'(cls, specialisations, inclusive):
# pass
#
# Note: type(name, bases, namespace) parameters cannot be passed by keyword
specialised_cls = MetaConcurrent(
f'{cls.__name__}[{spec}]', (cls,), {},
specialisations=specialisations, inclusive=inclusive
)
cls.__specialisations__[unique_spec] = specialised_cls
return specialised_cls
def __repr__(cls):
return f"<class 'usim.{cls.__name__}'>"
[docs]class Concurrent(BaseException, metaclass=MetaConcurrent):
"""
Exception from one or more concurrent :term:`activity`
A meta-exception that represents any :py:exc:`Exception` of any failing
:py:class:`~usim.typing.Task` of a :py:class:`~usim.Scope`. This does not
include any :py:exc:`Exception` thrown in the body of the scope. As a result,
it is possible to separately handle concurrent and regular exceptions:
.. code:: python3
try:
async with Scope() as scope:
if random.random() < 0.5:
scope.do(
async_raise(RuntimeError('concurrent'))
)
else:
raise RuntimeError('scoped')
except RuntimeError:
print('Failed in body')
except Concurrent:
print('Failed in child')
In addition to separating concurrent and regular exceptions,
:py:class:`~.Concurrent` can also separate different concurrent exception types.
Subscribing the :py:class:`~.Concurrent` type as ``Concurrent[Exception]``
specialises ``except`` clauses to a specific concurrent :py:exc:`Exception`:
.. code:: python3
try:
async with Scope() as scope:
if random.random() < 0.333:
scope.do(async_raise(KeyError('concurrent')))
elif random.random() < 0.5:
scope.do(async_raise(IndexError('concurrent')))
else:
scope.do(async_raise(ValueError('concurrent')))
except Concurrent[KeyError]:
print('Failed key lookup')
except Concurrent[IndexError]:
print('Failed indexing')
except (Concurrent[TypeError], Concurrent[ValueError]):
print('Incorrect type/value of something!')
Since a :py:class:`~usim.Scope` can run more than one :py:class:`~usim.typing.Task`
concurrently, there can be more than one exception as well. Subscribing
:py:class:`~.Concurrent` is possible for several types at once:
``Concurrent[ExceptionA, ExceptionB]`` matches only ``ExceptionA`` and
``ExceptionB`` at the same time, and
``Concurrent[ExceptionA, ExceptionB, ...]`` matches at least ``ExceptionA`` and
``ExceptionB`` at the same time.
.. code:: python3
try:
async with Scope() as scope:
scope.do(async_raise(KeyError('concurrent')))
if random.random() < 0.5:
scope.do(async_raise(IndexError('concurrent')))
if random.random() < 0.5:
scope.do(async_raise(ValueError('concurrent')))
except Concurrent[KeyError]:
print('Failed only key lookup')
except Concurrent[KeyError, IndexError]:
print('Failed key lookup and indexing')
except Concurrent[KeyError, ...]:
print('Failed key lookup and something else')
Note that
``except (Concurrent[A], Concurrent[B]:`` means *either* ``A`` *or* ``B``
whereas
``except Concurrent[A, B]:`` means *both* ``A`` *and* ``B``.
"""
# currently used specialised subclasses
__specialisations__ = WeakValueDictionary()
#: Whether this type accepts additional unmatched specialisations
inclusive: ClassVar[bool]
#: Specialisations this type expects in order to match
specialisations: ClassVar[Optional[Tuple[Type[Exception], ...]]]
#: Basic template of specialisation
template: ClassVar[MetaConcurrent]
#: Exceptions that occurred concurrently
children: 'Tuple[Union[Concurrent, Exception], ...]'
# __new__ automatically specialises Concurrent to match its children.
# Concurrent(A(), B()) => Concurrent[A, B](A(), B())
def __new__(cls: 'Type[Concurrent]', *children: 'Union[Concurrent, Exception]'):
if not children:
assert cls.specialisations is None,\
f"specialisation {cls.specialisations} does not match"\
f" children {children}; Note: Do not 'raise {cls.__name__}'"
return super().__new__(cls)
special_cls = cls[tuple(type(child) for child in children)]
return super().__new__(special_cls)
def __init__(self, *children: 'Union[Concurrent, Exception]'):
super().__init__(children)
self.children = children
def __str__(self):
return \
f'{self.__class__.__name__}: {", ".join(map(repr, self.children))}'
def __repr__(self):
return \
f'<object usim.{self.__class__.__name__} '\
f'of {", ".join(map(repr, self.children))}>'
[docs] def flattened(self) -> 'Concurrent':
"""
Collapse nested Concurrent exceptions
Recursively collapses nested ``Concurrent`` exceptions to provide a single
``Concurrent`` exception containing all :py:attr:`~.children` of the hierarchy.
For example, flattening a ``Concurrent(Concurrent(KeyError()), IndexError())``
provides a ``Concurrent(KeyError(), IndexError())``.
"""
if not any(isinstance(exc, Concurrent) for exc in self.children):
return self
leafs = []
for child in self.children:
if isinstance(child, Concurrent):
leafs.extend(child.flattened().children)
else:
leafs.append(child)
flat = Concurrent(*leafs)
flat.__cause__ = self.__cause__
flat.__context__ = self.__context__
return flat