# SPDX-Copyright: Copyright (c) Capital One Services, LLC
# SPDX-License-Identifier: Apache-2.0
# Copyright 2020 Capital One Services, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
"""
CEL Types: wrappers on Python types to provide CEL semantics.
This can be used by a Python module to work with CEL-friendly values and CEL results.
Examples of distinctions between CEL and Python:
- Unlike Python ``bool``, CEL :py:class:`BoolType` won't do some math.
- CEL has ``int64`` and ``uint64`` subclasses of integer. These have specific ranges and
raise :exc:`ValueError` errors on overflow.
CEL types will raise :exc:`ValueError` for out-of-range values and :exc:`TypeError`
for operations they refuse.
The :py:mod:`evaluation` module can capture these exceptions and turn them into result values.
This can permit the logic operators to quietly silence them via "short-circuiting".
In the normal course of events, CEL's evaluator may attempt operations between a
CEL exception result and an instance of one of CEL types.
We rely on this leading to an ordinary Python :exc:`TypeError` to be raised to propogate
the error. Or. A logic operator may discard the error object.
The :py:mod:`evaluation` module extends these types with it's own :exc:`CELEvalError` exception.
We try to keep that as a separate concern from the core operator implementations here.
We leverage Python features, which means raising exceptions when there is a problem.
Types
=============
See https://github.com/google/cel-go/tree/master/common/types
These are the Go type definitions that are used by CEL:
- BoolType
- BytesType
- DoubleType
- DurationType
- IntType
- ListType
- MapType
- NullType
- StringType
- TimestampType
- TypeType
- UintType
The above types are handled directly byt CEL syntax.
e.g., ``42`` vs. ``42u`` vs. ``"42"`` vs. ``b"42"`` vs. ``42.``.
We provide matching Python class names for each of these types. The Python type names
are subclasses of Python native types, allowing a client to transparently work with
CEL results. A Python host should be able to provide values to CEL that will be tolerated.
A type hint of ``Value`` unifies these into a common hint.
The CEL Go implementation also supports protobuf types:
- dpb.Duration
- tpb.Timestamp
- structpb.ListValue
- structpb.NullValue
- structpb.Struct
- structpb.Value
- wrapperspb.BoolValue
- wrapperspb.BytesValue
- wrapperspb.DoubleValue
- wrapperspb.FloatValue
- wrapperspb.Int32Value
- wrapperspb.Int64Value
- wrapperspb.StringValue
- wrapperspb.UInt32Value
- wrapperspb.UInt64Value
These types involve expressions like the following::
google.protobuf.UInt32Value{value: 123u}
In this case, the well-known protobuf name is directly visible as CEL syntax.
There's a ``google`` package with the needed definitions.
Type Provider
==============================
A type provider can be bound to the environment, this will support additional types.
This appears to be a factory to map names of types to type classes.
Run-time type binding is shown by a CEL expression like the following::
TestAllTypes{single_uint32_wrapper: 432u}
The ``TestAllTypes`` is a protobuf type added to the CEL run-time. The syntax
is defined by this syntax rule::
member_object : member "{" [fieldinits] "}"
The ``member`` is part of a type provider library,
either a standard protobuf definition or an extension. The field inits build
values for the protobuf object.
See https://github.com/google/cel-go/blob/master/test/proto3pb/test_all_types.proto
for the ``TestAllTypes`` protobuf definition that is registered as a type provider.
This expression will describes a Protobuf ``uint32`` object.
Type Adapter
=============
So far, it appears that a type adapter wraps existing Go or C++ types
with CEL-required methods. This seems like it does not need to be implemented
in Python.
Numeric Details
===============
Integer division truncates toward zero.
The Go definition of modulus::
// Mod returns the floating-point remainder of x/y.
// The magnitude of the result is less than y and its
// sign agrees with that of x.
https://golang.org/ref/spec#Arithmetic_operators
"Go has the nice property that -a/b == -(a/b)."
::
x y x / y x % y
5 3 1 2
-5 3 -1 -2
5 -3 -1 2
-5 -3 1 -2
Python definition::
The modulo operator always yields a result
with the same sign as its second operand (or zero);
the absolute value of the result is strictly smaller than
the absolute value of the second operand.
Here's the essential rule::
x//y * y + x%y == x
However. Python ``//`` truncates toward negative infinity. Go ``/`` truncates toward zero.
To get Go-like behavior, we need to use absolute values and restore the signs later.
::
x_sign = -1 if x < 0 else +1
go_mod = x_sign * (abs(x) % abs(y))
return go_mod
Timzone Details
===============
An implementation may have additional timezone names that must be injected into
the ``pendulum`` processing. (Formerly ``dateutil.gettz()``.)
For example, there may be the following sequence:
1. A lowercase match for an alias or an existing timezone.
2. A titlecase match for an existing timezone.
3. The fallback, which is a +/-HH:MM string.
.. TODO: Permit an extension into the timezone lookup.
"""
import datetime
import logging
import re
from functools import reduce, wraps
from math import fsum
from typing import (
Any,
Callable,
Dict,
Iterable,
List,
Mapping,
NoReturn,
Optional,
Sequence,
Tuple,
Type,
TypeVar,
Union,
cast,
overload,
)
import pendulum
from pendulum import timezone
import pendulum.tz.exceptions
logger = logging.getLogger(f"celpy.{__name__}")
Value = Union[
"BoolType",
"BytesType",
"DoubleType",
"DurationType",
"IntType",
"ListType",
"MapType",
None, # Used instead of NullType
"StringType",
"TimestampType",
"UintType",
]
# The domain of types used to build Annotations.
CELType = Union[
Type["BoolType"],
Type["BytesType"],
Type["DoubleType"],
Type["DurationType"],
Type["IntType"],
Type["ListType"],
Type["MapType"],
Callable[..., None], # Used instead of NullType
Type["StringType"],
Type["TimestampType"],
Type["TypeType"], # Used to mark Protobuf Type values
Type["UintType"],
Type["PackageType"],
Type["MessageType"],
]
[docs]
def type_matched(method: Callable[[Any, Any], Any]) -> Callable[[Any, Any], Any]:
"""Decorates a method to assure the "other" value has the same type."""
@wraps(method)
def type_matching_method(self: Any, other: Any) -> Any:
if not (
issubclass(type(other), type(self)) or issubclass(type(self), type(other))
):
raise TypeError(
f"no such overload: {self!r} {type(self)} != {other!r} {type(other)}"
)
return method(self, other)
return type_matching_method
[docs]
def logical_condition(e: Value, x: Value, y: Value) -> Value:
"""
CEL e ? x : y operator.
Choose one of x or y. Exceptions in the unchosen expression are ignored.
Example::
2 / 0 > 4 ? 'baz' : 'quux'
is a "division by zero" error.
::
>>> logical_condition(
... BoolType(True), StringType("this"), StringType("Not That"))
StringType('this')
>>> logical_condition(
... BoolType(False), StringType("Not This"), StringType("that"))
StringType('that')
"""
if not isinstance(e, BoolType):
raise TypeError(f"Unexpected {type(e)} ? {type(x)} : {type(y)}")
result = x if e else y
logger.debug("logical_condition(%r, %r, %r) = %r", e, x, y, result)
return result
[docs]
def logical_and(x: Value, y: Value) -> Value:
"""
Native Python has a left-to-right rule.
CEL && is commutative with non-Boolean values, including error objects.
"""
if not isinstance(x, BoolType) and not isinstance(y, BoolType):
raise TypeError(f"{type(x)} {x!r} and {type(y)} {y!r}")
elif not isinstance(x, BoolType) and isinstance(y, BoolType):
if y:
return x # whatever && true == whatever
else:
return y # whatever && false == false
elif isinstance(x, BoolType) and not isinstance(y, BoolType):
if x:
return y # true && whatever == whatever
else:
return x # false && whatever == false
else:
return BoolType(cast(BoolType, x) and cast(BoolType, y))
[docs]
def logical_not(x: Value) -> Value:
"""
Native python `not` isn't fully exposed for CEL types.
"""
if isinstance(x, BoolType):
result = BoolType(not x)
else:
raise TypeError(f"not {type(x)}")
logger.debug("logical_not(%r) = %r", x, result)
return result
[docs]
def logical_or(x: Value, y: Value) -> Value:
"""
Native Python has a left-to-right rule: (True or y) is True, (False or y) is y.
CEL || is commutative with non-Boolean values, including errors.
``(x || false)`` is ``x``, and ``(false || y)`` is ``y``.
Example 1::
false || 1/0 != 0
is a "no matching overload" error.
Example 2::
(2 / 0 > 3 ? false : true) || true
is a "True"
If the operand(s) are not BoolType, we'll create an TypeError that will become a CELEvalError.
"""
if not isinstance(x, BoolType) and not isinstance(y, BoolType):
raise TypeError(f"{type(x)} {x!r} or {type(y)} {y!r}")
elif not isinstance(x, BoolType) and isinstance(y, BoolType):
if y:
return y # whatever || true == true
else:
return x # whatever || false == whatever
elif isinstance(x, BoolType) and not isinstance(y, BoolType):
if x:
return x # true || whatever == true
else:
return y # false || whatever == whatever
else:
return BoolType(cast(BoolType, x) or cast(BoolType, y))
[docs]
class BoolType(int):
"""
Native Python permits unary operators on Booleans.
For CEL, We need to prevent -false from working.
"""
[docs]
def __new__(cls: Type["BoolType"], source: Any) -> "BoolType":
if source is None:
return super().__new__(cls, 0)
elif isinstance(source, BoolType):
return source
elif isinstance(source, MessageType):
return super().__new__(cls, cast(int, source.get(StringType("value"))))
else:
return super().__new__(cls, source)
[docs]
def __repr__(self) -> str:
return f"{self.__class__.__name__}({bool(self)})"
[docs]
def __str__(self) -> str:
return str(bool(self))
[docs]
def __neg__(self) -> NoReturn:
raise TypeError("no such overload")
[docs]
def __hash__(self) -> int:
return super().__hash__()
[docs]
class BytesType(bytes):
"""Python's bytes semantics are close to CEL."""
[docs]
def __new__(
cls: Type["BytesType"],
source: Union[str, bytes, Iterable[int], "BytesType", "StringType"],
*args: Any,
**kwargs: Any,
) -> "BytesType":
if source is None:
return super().__new__(cls, b"")
elif isinstance(source, (bytes, BytesType)):
return super().__new__(cls, source)
elif isinstance(source, (str, StringType)):
return super().__new__(cls, source.encode("utf-8"))
elif isinstance(source, MessageType):
return super().__new__(
cls,
cast(bytes, source.get(StringType("value"))), # type: ignore [attr-defined]
)
elif isinstance(source, Iterable):
return super().__new__(cls, cast(Iterable[int], source))
else:
raise TypeError(f"Invalid initial value type: {type(source)}")
[docs]
def __repr__(self) -> str:
return f"{self.__class__.__name__}({super().__repr__()})"
[docs]
class DoubleType(float):
"""
Native Python permits mixed type comparisons, doing conversions as needed.
For CEL, we need to prevent mixed-type comparisons from working.
TODO: Conversions from string? IntType? UintType? DoubleType?
"""
[docs]
def __new__(cls: Type["DoubleType"], source: Any) -> "DoubleType":
if source is None:
return super().__new__(cls, 0)
elif isinstance(source, MessageType):
return super().__new__(cls, cast(float, source.get(StringType("value"))))
else:
return super().__new__(cls, source)
[docs]
def __repr__(self) -> str:
return f"{self.__class__.__name__}({super().__repr__()})"
[docs]
def __str__(self) -> str:
text = str(float(self))
return text
[docs]
def __neg__(self) -> "DoubleType":
return DoubleType(super().__neg__())
[docs]
def __mod__(self, other: Any) -> NoReturn:
raise TypeError(
f"found no matching overload for '_%_' applied to '(double, {type(other)})'"
)
[docs]
def __truediv__(self, other: Any) -> "DoubleType":
if cast(float, other) == 0.0:
return DoubleType("inf")
else:
return DoubleType(super().__truediv__(other))
[docs]
def __rmod__(self, other: Any) -> NoReturn:
raise TypeError(
f"found no matching overload for '_%_' applied to '({type(other)}, double)'"
)
[docs]
def __rtruediv__(self, other: Any) -> "DoubleType":
if self == 0.0:
return DoubleType("inf")
else:
return DoubleType(super().__rtruediv__(other))
[docs]
@type_matched
def __eq__(self, other: Any) -> bool:
return super().__eq__(other)
[docs]
@type_matched
def __ne__(self, other: Any) -> bool:
return super().__ne__(other)
[docs]
def __hash__(self) -> int:
return super().__hash__()
IntOperator = TypeVar("IntOperator", bound=Callable[..., int])
[docs]
def int64(operator: IntOperator) -> IntOperator:
"""Apply an operation, but assure the value is within the int64 range."""
@wraps(operator)
def clamped_operator(*args: Any, **kwargs: Any) -> int:
result: int = operator(*args, **kwargs)
if -(2**63) <= result < 2**63:
return result
raise ValueError("overflow")
return cast(IntOperator, clamped_operator)
[docs]
class IntType(int):
"""
A version of int with overflow errors outside int64 range.
features/integer_math.feature:277 "int64_overflow_positive"
>>> IntType(9223372036854775807) + IntType(1)
Traceback (most recent call last):
...
ValueError: overflow
>>> 2**63
9223372036854775808
features/integer_math.feature:285 "int64_overflow_negative"
>>> -IntType(9223372036854775808) - IntType(1)
Traceback (most recent call last):
...
ValueError: overflow
>>> IntType(DoubleType(1.9))
IntType(2)
>>> IntType(DoubleType(-123.456))
IntType(-123)
"""
[docs]
def __new__(
cls: Type["IntType"], source: Any, *args: Any, **kwargs: Any
) -> "IntType":
convert: Callable[..., int]
if source is None:
return super().__new__(cls, 0)
elif isinstance(source, IntType):
return source
elif isinstance(source, MessageType):
# Used by protobuf.
return super().__new__(cls, cast(int, source.get(StringType("value"))))
elif isinstance(source, (float, DoubleType)):
convert = int64(round)
elif isinstance(source, TimestampType):
convert = int64(lambda src: src.timestamp())
elif isinstance(source, (str, StringType)) and source[:2] in {"0x", "0X"}:
convert = int64(lambda src: int(src[2:], 16))
elif isinstance(source, (str, StringType)) and source[:3] in {"-0x", "-0X"}:
convert = int64(lambda src: -int(src[3:], 16))
else:
# Must tolerate "-" as part of the literal.
# See https://github.com/google/cel-spec/issues/126
convert = int64(int)
return super().__new__(cls, convert(source))
[docs]
def __repr__(self) -> str:
return f"{self.__class__.__name__}({super().__repr__()})"
[docs]
def __str__(self) -> str:
text = str(int(self))
return text
[docs]
@int64
def __neg__(self) -> "IntType":
return IntType(super().__neg__())
[docs]
@int64
def __add__(self, other: Any) -> "IntType":
return IntType(super().__add__(cast(IntType, other)))
[docs]
@int64
def __sub__(self, other: Any) -> "IntType":
return IntType(super().__sub__(cast(IntType, other)))
[docs]
@int64
def __mul__(self, other: Any) -> "IntType":
return IntType(super().__mul__(cast(IntType, other)))
[docs]
@int64
def __truediv__(self, other: Any) -> "IntType":
other = cast(IntType, other)
self_sign = -1 if self < IntType(0) else +1
other_sign = -1 if other < IntType(0) else +1
go_div = self_sign * other_sign * (abs(self) // abs(other))
return IntType(go_div)
__floordiv__ = __truediv__
[docs]
@int64
def __mod__(self, other: Any) -> "IntType":
self_sign = -1 if self < IntType(0) else +1
go_mod = self_sign * (abs(self) % abs(cast(IntType, other)))
return IntType(go_mod)
[docs]
@int64
def __radd__(self, other: Any) -> "IntType":
return IntType(super().__radd__(cast(IntType, other)))
[docs]
@int64
def __rsub__(self, other: Any) -> "IntType":
return IntType(super().__rsub__(cast(IntType, other)))
[docs]
@int64
def __rmul__(self, other: Any) -> "IntType":
return IntType(super().__rmul__(cast(IntType, other)))
[docs]
@int64
def __rtruediv__(self, other: Any) -> "IntType":
other = cast(IntType, other)
self_sign = -1 if self < IntType(0) else +1
other_sign = -1 if other < IntType(0) else +1
go_div = self_sign * other_sign * (abs(other) // abs(self))
return IntType(go_div)
__rfloordiv__ = __rtruediv__
[docs]
@int64
def __rmod__(self, other: Any) -> "IntType":
left_sign = -1 if other < IntType(0) else +1
go_mod = left_sign * (abs(other) % abs(self))
return IntType(go_mod)
[docs]
@type_matched
def __eq__(self, other: Any) -> bool:
return super().__eq__(other)
[docs]
@type_matched
def __ne__(self, other: Any) -> bool:
return super().__ne__(other)
[docs]
@type_matched
def __lt__(self, other: Any) -> bool:
return super().__lt__(other)
[docs]
@type_matched
def __le__(self, other: Any) -> bool:
return super().__le__(other)
[docs]
@type_matched
def __gt__(self, other: Any) -> bool:
return super().__gt__(other)
[docs]
@type_matched
def __ge__(self, other: Any) -> bool:
return super().__ge__(other)
[docs]
def __hash__(self) -> int:
return super().__hash__()
[docs]
def uint64(operator: IntOperator) -> IntOperator:
"""Apply an operation, but assure the value is within the uint64 range."""
@wraps(operator)
def clamped_operator(*args: Any, **kwargs: Any) -> int:
result = operator(*args, **kwargs)
if 0 <= result < 2**64:
return result
raise ValueError("overflow")
return cast(IntOperator, clamped_operator)
[docs]
class UintType(int):
"""
A version of int with overflow errors outside uint64 range.
Alternatives:
Option 1 - Use https://pypi.org/project/fixedint/
Option 2 - use array or struct modules to access an unsigned object.
Test Cases:
features/integer_math.feature:149 "unary_minus_no_overload"
>>> -UintType(42)
Traceback (most recent call last):
...
TypeError: no such overload
uint64_overflow_positive
>>> UintType(18446744073709551615) + UintType(1)
Traceback (most recent call last):
...
ValueError: overflow
uint64_overflow_negative
>>> UintType(0) - UintType(1)
Traceback (most recent call last):
...
ValueError: overflow
>>> - UintType(5)
Traceback (most recent call last):
...
TypeError: no such overload
"""
[docs]
def __new__(
cls: Type["UintType"], source: Any, *args: Any, **kwargs: Any
) -> "UintType":
convert: Callable[..., int]
if isinstance(source, UintType):
return source
elif isinstance(source, (float, DoubleType)):
convert = uint64(round)
elif isinstance(source, TimestampType):
convert = uint64(lambda src: src.timestamp())
elif isinstance(source, (str, StringType)) and source[:2] in {"0x", "0X"}:
convert = uint64(lambda src: int(src[2:], 16))
elif isinstance(source, MessageType):
# Used by protobuf.
convert = uint64(
lambda src: src["value"] if src["value"] is not None else 0
)
elif source is None:
convert = uint64(lambda src: 0)
else:
convert = uint64(int)
return super().__new__(cls, convert(source))
[docs]
def __repr__(self) -> str:
return f"{self.__class__.__name__}({super().__repr__()})"
[docs]
def __str__(self) -> str:
text = str(int(self))
return text
[docs]
def __neg__(self) -> NoReturn:
raise TypeError("no such overload")
[docs]
@uint64
def __add__(self, other: Any) -> "UintType":
return UintType(super().__add__(cast(IntType, other)))
[docs]
@uint64
def __sub__(self, other: Any) -> "UintType":
return UintType(super().__sub__(cast(IntType, other)))
[docs]
@uint64
def __mul__(self, other: Any) -> "UintType":
return UintType(super().__mul__(cast(IntType, other)))
[docs]
@uint64
def __truediv__(self, other: Any) -> "UintType":
return UintType(super().__floordiv__(cast(IntType, other)))
__floordiv__ = __truediv__
[docs]
@uint64
def __mod__(self, other: Any) -> "UintType":
return UintType(super().__mod__(cast(IntType, other)))
[docs]
@uint64
def __radd__(self, other: Any) -> "UintType":
return UintType(super().__radd__(cast(IntType, other)))
[docs]
@uint64
def __rsub__(self, other: Any) -> "UintType":
return UintType(super().__rsub__(cast(IntType, other)))
[docs]
@uint64
def __rmul__(self, other: Any) -> "UintType":
return UintType(super().__rmul__(cast(IntType, other)))
[docs]
@uint64
def __rtruediv__(self, other: Any) -> "UintType":
return UintType(super().__rfloordiv__(cast(IntType, other)))
__rfloordiv__ = __rtruediv__
[docs]
@uint64
def __rmod__(self, other: Any) -> "UintType":
return UintType(super().__rmod__(cast(IntType, other)))
[docs]
@type_matched
def __eq__(self, other: Any) -> bool:
return super().__eq__(other)
[docs]
@type_matched
def __ne__(self, other: Any) -> bool:
return super().__ne__(other)
[docs]
def __hash__(self) -> int:
return super().__hash__()
[docs]
class ListType(List[Value]):
"""
Native Python implements comparison operations between list objects.
For CEL, we prevent list comparison operators from working.
We provide an :py:meth:`__eq__` and :py:meth:`__ne__` that
gracefully ignore type mismatch problems, calling them not equal.
See https://github.com/google/cel-spec/issues/127
An implied logical And means a singleton behaves in a distinct way from a non-singleton list.
"""
[docs]
def __repr__(self) -> str:
return f"{self.__class__.__name__}({super().__repr__()})"
[docs]
def __lt__(self, other: Any) -> NoReturn:
raise TypeError("no such overload")
[docs]
def __le__(self, other: Any) -> NoReturn:
raise TypeError("no such overload")
[docs]
def __gt__(self, other: Any) -> NoReturn:
raise TypeError("no such overload")
[docs]
def __ge__(self, other: Any) -> NoReturn:
raise TypeError("no such overload")
[docs]
def __eq__(self, other: Any) -> bool:
if not isinstance(other, (list, ListType)):
raise TypeError(f"no such overload: ListType == {type(other)}")
def equal(s: Any, o: Any) -> Value:
try:
return BoolType(s == o)
except TypeError as ex:
return cast(BoolType, ex) # Instead of Union[BoolType, TypeError]
result = len(self) == len(other) and reduce( # noqa: W503
logical_and, # type: ignore [arg-type]
(equal(item_s, item_o) for item_s, item_o in zip(self, other)),
BoolType(True), # type: ignore [arg-type]
)
if isinstance(result, TypeError):
raise result
return bool(result)
[docs]
def __ne__(self, other: Any) -> bool:
if not isinstance(other, (list, ListType)):
raise TypeError(f"no such overload: ListType != {type(other)}")
def not_equal(s: Any, o: Any) -> Value:
try:
return BoolType(s != o)
except TypeError as ex:
return cast(BoolType, ex) # Instead of Union[BoolType, TypeError]
result = len(self) != len(other) or reduce( # noqa: W503
logical_or, # type: ignore [arg-type]
(not_equal(item_s, item_o) for item_s, item_o in zip(self, other)),
BoolType(False), # type: ignore [arg-type]
)
if isinstance(result, TypeError):
raise result
return bool(result)
BaseMapTypes = Union[Mapping[Any, Any], Sequence[Tuple[Any, Any]], None]
MapKeyTypes = Union["IntType", "UintType", "BoolType", "StringType", str]
[docs]
class MapType(Dict[Value, Value]):
"""
Native Python allows mapping updates and any hashable type as a kay.
CEL prevents mapping updates and has a limited domain of key types.
int, uint, bool, or string keys
We provide an :py:meth:`__eq__` and :py:meth:`__ne__` that
gracefully ignore type mismatch problems for the values, calling them not equal.
See https://github.com/google/cel-spec/issues/127
An implied logical And means a singleton behaves in a distinct way from a non-singleton mapping.
"""
[docs]
def __init__(self, items: BaseMapTypes = None) -> None:
super().__init__()
if items is None:
pass
elif isinstance(items, Sequence):
for name, value in items:
self[name] = value
elif isinstance(items, Mapping):
for name, value in items.items():
self[name] = value
else:
raise TypeError(f"Invalid initial value type: {type(items)}")
[docs]
def __repr__(self) -> str:
return f"{self.__class__.__name__}({super().__repr__()})"
[docs]
def __getitem__(self, key: Any) -> Any:
if not MapType.valid_key_type(key):
raise TypeError(f"unsupported key type: {type(key)}")
return super().__getitem__(key)
[docs]
def __eq__(self, other: Any) -> bool:
if not isinstance(other, (Mapping, MapType)):
raise TypeError(f"no such overload: MapType == {type(other)}")
def equal(s: Any, o: Any) -> BoolType:
try:
return BoolType(s == o)
except TypeError as ex:
return cast(BoolType, ex) # Instead of Union[BoolType, TypeError]
keys_s = self.keys()
keys_o = other.keys()
result = keys_s == keys_o and reduce( # noqa: W503
logical_and, # type: ignore [arg-type]
(equal(self[k], other[k]) for k in keys_s),
BoolType(True), # type: ignore [arg-type]
)
if isinstance(result, TypeError):
raise result
return bool(result)
[docs]
def __ne__(self, other: Any) -> bool:
if not isinstance(other, (Mapping, MapType)):
raise TypeError(f"no such overload: MapType != {type(other)}")
# Singleton special case, may return no-such overload.
if len(self) == 1 and len(other) == 1 and self.keys() == other.keys():
k = next(iter(self.keys()))
return cast(
bool, self[k] != other[k]
) # Instead of Union[BoolType, TypeError]
def not_equal(s: Any, o: Any) -> BoolType:
try:
return BoolType(s != o)
except TypeError as ex:
return cast(BoolType, ex) # Instead of Union[BoolType, TypeError]
keys_s = self.keys()
keys_o = other.keys()
result = keys_s != keys_o or reduce( # noqa: W503
logical_or, # type: ignore [arg-type]
(not_equal(self[k], other[k]) for k in keys_s),
BoolType(False), # type: ignore [arg-type]
)
if isinstance(result, TypeError):
raise result
return bool(result)
[docs]
@staticmethod
def valid_key_type(key: Any) -> bool:
"""Valid CEL key types. Plus native str for tokens in the source when evaluating ``e.f``"""
return isinstance(key, (IntType, UintType, BoolType, StringType, str))
[docs]
class NullType:
"""Python's None semantics aren't quite right for CEL."""
[docs]
def __eq__(self, other: Any) -> bool:
return isinstance(other, NullType)
[docs]
def __ne__(self, other: Any) -> bool:
return not isinstance(other, NullType)
[docs]
class StringType(str):
"""Python's str semantics are very, very close to CEL.
We rely on the overlap between ``"/u270c"`` and ``"/U0001f431"`` in CEL and Python.
"""
[docs]
def __new__(
cls: Type["StringType"],
source: Union[str, bytes, "BytesType", "StringType"],
*args: Any,
**kwargs: Any,
) -> "StringType":
if isinstance(source, (bytes, BytesType)):
return super().__new__(cls, source.decode("utf"))
elif isinstance(source, (str, StringType)):
# TODO: Consider returning the original StringType object.
return super().__new__(cls, source)
else:
return cast(StringType, super().__new__(cls, source))
[docs]
def __repr__(self) -> str:
return f"{self.__class__.__name__}({super().__repr__()})"
[docs]
def __eq__(self, other: Any) -> bool:
return super().__eq__(other)
[docs]
def __ne__(self, other: Any) -> bool:
return super().__ne__(other)
[docs]
def __hash__(self) -> int:
return super().__hash__()
[docs]
class TimestampType(datetime.datetime):
"""
Implements google.protobuf.Timestamp
See https://developers.google.com/protocol-buffers/docs/reference/google.protobuf
Also see https://www.ietf.org/rfc/rfc3339.txt.
The protobuf implementation is an ordered pair of int64 seconds and int32 nanos.
Instead of a Tuple[int, int] we use a wrapper for :py:class:`datetime.datetime`.
From protobuf documentation for making a Timestamp in Python::
now = time.time()
seconds = int(now)
nanos = int((now - seconds) * 10**9)
timestamp = Timestamp(seconds=seconds, nanos=nanos)
Also::
>>> t = TimestampType("2009-02-13T23:31:30Z")
>>> repr(t)
"TimestampType('2009-02-13T23:31:30Z')"
>>> t.timestamp()
1234567890.0
>>> str(t)
'2009-02-13T23:31:30Z'
:strong:`Timezones`
Timezones are expressed in the following grammar:
::
TimeZone = "UTC" | LongTZ | FixedTZ ;
LongTZ = ? list available at
http://joda-time.sourceforge.net/timezones.html ? ;
FixedTZ = ( "+" | "-" ) Digit Digit ":" Digit Digit ;
Digit = "0" | "1" | ... | "9" ;
Fixed timezones are explicit hour and minute offsets from UTC.
Long timezone names are like Europe/Paris, CET, or US/Central.
The Joda project (https://www.joda.org/joda-time/timezones.html)
says "Time zone data is provided by the public IANA time zone database."
TZ handling and timestamp parsing is doine with
the ``pendulum`` (https://pendulum.eustace.io) project.
Additionally, there is a ``TZ_ALIASES`` mapping available in this class to permit additional
timezone names. By default, the mapping is empty, and the only names
available are those recognized by :mod:`pendulum.timezone`.
"""
TZ_ALIASES: Dict[str, str] = {}
[docs]
def __new__(
cls: Type["TimestampType"],
source: Union[int, str, datetime.datetime],
*args: Any,
**kwargs: Any,
) -> "TimestampType":
if isinstance(source, datetime.datetime):
# Wrap a datetime.datetime
return super().__new__(
cls,
year=source.year,
month=source.month,
day=source.day,
hour=source.hour,
minute=source.minute,
second=source.second,
microsecond=source.microsecond,
tzinfo=source.tzinfo or datetime.timezone.utc,
)
elif isinstance(source, int) and len(args) >= 2:
# Wrap a sequence of integers that datetime.datetime might accept.
ts: TimestampType = super().__new__(cls, source, *args, **kwargs)
if not ts.tzinfo:
ts = ts.replace(tzinfo=datetime.timezone.utc)
return ts
elif isinstance(source, str):
# Use dateutil to try a variety of text formats.
parsed_datetime = cast(datetime.datetime, pendulum.parse(source))
return super().__new__(
cls,
year=parsed_datetime.year,
month=parsed_datetime.month,
day=parsed_datetime.day,
hour=parsed_datetime.hour,
minute=parsed_datetime.minute,
second=parsed_datetime.second,
microsecond=parsed_datetime.microsecond,
tzinfo=parsed_datetime.tzinfo,
)
else:
raise TypeError(f"Cannot create {cls} from {source!r}")
[docs]
def __repr__(self) -> str:
return f"{self.__class__.__name__}({str(self)!r})"
[docs]
def __str__(self) -> str:
text = self.strftime("%Y-%m-%dT%H:%M:%S%z")
if text.endswith("+0000"):
return f"{text[:-5]}Z"
return f"{text[:-2]}:{text[-2:]}"
[docs]
def __add__(self, other: Any) -> "TimestampType":
"""Timestamp + Duration -> Timestamp"""
result = super().__add__(other)
if result == NotImplemented:
return NotImplemented
return TimestampType(result)
[docs]
def __radd__(self, other: Any) -> "TimestampType":
"""Duration + Timestamp -> Timestamp"""
result = super().__radd__(other)
if result == NotImplemented:
return NotImplemented
return TimestampType(result)
# For more information, check the typeshed definition
# https://github.com/python/typeshed/blob/master/stdlib/2and3/datetime.pyi
@overload # type: ignore
def __sub__(self, other: "TimestampType") -> "DurationType": ... # pragma: no cover
@overload
def __sub__(self, other: "DurationType") -> "TimestampType": ... # pragma: no cover
[docs]
def __sub__(
self, other: Union["TimestampType", "DurationType"]
) -> Union["TimestampType", "DurationType"]:
result = super().__sub__(other)
if result == NotImplemented:
return cast(DurationType, result)
if isinstance(result, datetime.timedelta):
return DurationType(result)
return TimestampType(result)
[docs]
@classmethod
def tz_name_lookup(cls, tz_name: str) -> Optional[datetime.tzinfo]:
"""
The :py:func:`dateutil.tz.gettz` may be extended with additional aliases.
.. TODO: Permit an extension into the timezone lookup.
Tweak ``celpy.celtypes.TimestampType.TZ_ALIASES``.
"""
tz_lookup = str(tz_name)
tz: Optional[datetime.tzinfo]
if tz_lookup in cls.TZ_ALIASES:
tz = timezone(cls.TZ_ALIASES[tz_lookup])
else:
try:
tz = cast(datetime.tzinfo, timezone(tz_lookup))
except pendulum.tz.exceptions.InvalidTimezone:
# ±hh:mm format...
tz = cls.tz_offset_parse(tz_name)
return tz
[docs]
@classmethod
def tz_offset_parse(cls, tz_name: str) -> Optional[datetime.tzinfo]:
tz_pat = re.compile(r"^([+-]?)(\d\d?):(\d\d)$")
tz_match = tz_pat.match(tz_name)
if not tz_match:
raise ValueError(f"Unparsable timezone: {tz_name!r}")
sign, hh, mm = tz_match.groups()
offset_min = (int(hh) * 60 + int(mm)) * (-1 if sign == "-" else +1)
offset = datetime.timedelta(seconds=offset_min * 60)
tz = datetime.timezone(offset)
return tz
[docs]
@staticmethod
def tz_parse(tz_name: Optional[str]) -> Optional[datetime.tzinfo]:
if tz_name:
tz = TimestampType.tz_name_lookup(tz_name)
return tz
else:
return timezone("UTC")
[docs]
def getDate(self, tz_name: Optional[StringType] = None) -> IntType:
new_tz = self.tz_parse(tz_name)
return IntType(self.astimezone(new_tz).day)
[docs]
def getDayOfMonth(self, tz_name: Optional[StringType] = None) -> IntType:
new_tz = self.tz_parse(tz_name)
return IntType(self.astimezone(new_tz).day - 1)
[docs]
def getDayOfWeek(self, tz_name: Optional[StringType] = None) -> IntType:
new_tz = self.tz_parse(tz_name)
return IntType(self.astimezone(new_tz).isoweekday() % 7)
[docs]
def getDayOfYear(self, tz_name: Optional[StringType] = None) -> IntType:
new_tz = self.tz_parse(tz_name)
working_date = self.astimezone(new_tz)
jan1 = datetime.datetime(working_date.year, 1, 1, tzinfo=new_tz)
days = working_date.toordinal() - jan1.toordinal()
return IntType(days)
[docs]
def getMonth(self, tz_name: Optional[StringType] = None) -> IntType:
new_tz = self.tz_parse(tz_name)
return IntType(self.astimezone(new_tz).month - 1)
[docs]
def getFullYear(self, tz_name: Optional[StringType] = None) -> IntType:
new_tz = self.tz_parse(tz_name)
return IntType(self.astimezone(new_tz).year)
[docs]
def getHours(self, tz_name: Optional[StringType] = None) -> IntType:
new_tz = self.tz_parse(tz_name)
return IntType(self.astimezone(new_tz).hour)
[docs]
def getMilliseconds(self, tz_name: Optional[StringType] = None) -> IntType:
new_tz = self.tz_parse(tz_name)
return IntType(self.astimezone(new_tz).microsecond // 1000)
[docs]
def getMinutes(self, tz_name: Optional[StringType] = None) -> IntType:
new_tz = self.tz_parse(tz_name)
return IntType(self.astimezone(new_tz).minute)
[docs]
def getSeconds(self, tz_name: Optional[StringType] = None) -> IntType:
new_tz = self.tz_parse(tz_name)
return IntType(self.astimezone(new_tz).second)
[docs]
class DurationType(datetime.timedelta):
"""
Implements google.protobuf.Duration
https://developers.google.com/protocol-buffers/docs/reference/google.protobuf#duration
The protobuf implementation is an ordered pair of int64 seconds and int32 nanos.
Instead of a Tuple[int, int] we use a wrapper for :py:class:`datetime.timedelta`.
The definition once said this::
"type conversion, duration should be end with "s", which stands for seconds"
This is obsolete, however, considering the following issue.
See https://github.com/google/cel-spec/issues/138
This refers to the following implementation detail
::
// A duration string is a possibly signed sequence of
// decimal numbers, each with optional fraction and a unit suffix,
// such as "300ms", "-1.5h" or "2h45m".
// Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".
The real regex, then is this::
[-+]?([0-9]*(\\.[0-9]*)?[a-z]+)+
"""
MaxSeconds = 315576000000
MinSeconds = -315576000000
NanosecondsPerSecond = 1000000000
scale: Dict[str, float] = {
"ns": 1e-9,
"us": 1e-6,
"µs": 1e-6,
"ms": 1e-3,
"s": 1.0,
"m": 60.0,
"h": 60.0 * 60.0,
"d": 24.0 * 60.0 * 60.0,
}
[docs]
def __new__(
cls: Type["DurationType"], seconds: Any, nanos: int = 0, **kwargs: Any
) -> "DurationType":
if isinstance(seconds, datetime.timedelta):
if not (cls.MinSeconds <= seconds.total_seconds() <= cls.MaxSeconds):
raise ValueError("range error: {seconds}")
return super().__new__(
cls,
days=seconds.days,
seconds=seconds.seconds,
microseconds=seconds.microseconds,
)
elif isinstance(seconds, int):
if not (cls.MinSeconds <= seconds <= cls.MaxSeconds):
raise ValueError("range error: {seconds}")
return super().__new__(cls, seconds=seconds, microseconds=nanos // 1000)
elif isinstance(seconds, str):
duration_pat = re.compile(r"^[-+]?([0-9]*(\.[0-9]*)?[a-z]+)+$")
duration_match = duration_pat.match(seconds)
if not duration_match:
raise ValueError(f"Invalid duration {seconds!r}")
# Consume the sign.
sign: float
if seconds.startswith("+"):
seconds = seconds[1:]
sign = +1
elif seconds.startswith("-"):
seconds = seconds[1:]
sign = -1
else:
sign = +1
# Sum the remaining time components: number * unit
try:
seconds = sign * fsum(
map(
lambda n_u: float(n_u.group(1)) * cls.scale[n_u.group(3)],
re.finditer(r"([0-9]*(\.[0-9]*)?)([a-z]+)", seconds),
)
)
except KeyError:
raise ValueError(f"Invalid duration {seconds!r}")
if not (cls.MinSeconds <= seconds <= cls.MaxSeconds):
raise ValueError("range error: {seconds}")
return super().__new__(cls, seconds=seconds)
else:
raise TypeError(f"Invalid initial value type: {type(seconds)}")
[docs]
def __repr__(self) -> str:
return f"{self.__class__.__name__}({str(self)!r})"
[docs]
def __str__(self) -> str:
return "{0}s".format(int(self.total_seconds()))
[docs]
def __add__(self, other: Any) -> "DurationType":
"""
This doesn't need to handle the rich variety of TimestampType overloadds.
This class only needs to handle results of duration + duration.
A duration + timestamp is not implemented by the timedelta superclass;
it is handled by the datetime superclass that implementes timestamp + duration.
"""
result = super().__add__(other)
if result == NotImplemented:
return cast(DurationType, result)
# This is handled by TimestampType; this is here for completeness, but isn't used.
if isinstance(result, (datetime.datetime, TimestampType)):
return TimestampType(result) # pragma: no cover
return DurationType(result)
[docs]
def __radd__(self, other: Any) -> "DurationType": # pragma: no cover
"""
This doesn't need to handle the rich variety of TimestampType overloadds.
Most cases are handled by TimeStamp.
"""
result = super().__radd__(other)
if result == NotImplemented:
return cast(DurationType, result)
# This is handled by TimestampType; this is here for completeness, but isn't used.
if isinstance(result, (datetime.datetime, TimestampType)):
return TimestampType(result)
return DurationType(result)
[docs]
def getHours(self, tz_name: Optional[str] = None) -> IntType:
assert tz_name is None
return IntType(int(self.total_seconds() / 60 / 60))
[docs]
def getMilliseconds(self, tz_name: Optional[str] = None) -> IntType:
assert tz_name is None
return IntType(int(self.total_seconds() * 1000))
[docs]
def getMinutes(self, tz_name: Optional[str] = None) -> IntType:
assert tz_name is None
return IntType(int(self.total_seconds() / 60))
[docs]
def getSeconds(self, tz_name: Optional[str] = None) -> IntType:
assert tz_name is None
return IntType(int(self.total_seconds()))
[docs]
class FunctionType:
"""
We need a concrete Annotation object to describe callables to celpy.
We need to describe functions as well as callable objects.
The description would tend to shadow ``typing.Callable``.
An ``__isinstance__()`` method, for example, may be helpful for run-time type-checking.
Superclass for CEL extension functions that are defined at run-time.
This permits a formal annotation in the environment construction that creates
an intended type for a given name.
This allows for some run-time type checking to see if the actual object binding
matches the declared type binding.
Also used to define protobuf classes provided as an annotation.
We *could* define this as three overloads to cover unary, binary, and tertiary cases.
"""
[docs]
def __call__(self, *args: Value, **kwargs: Value) -> Value:
raise NotImplementedError
[docs]
class PackageType(MapType):
"""
A package of message types, usually protobuf.
TODO: This may not be needed.
"""
pass
[docs]
class MessageType(MapType):
"""
An individual protobuf message definition. A mapping from field name to field value.
See Scenario: "message_literal" in the parse.feature. This is a very deeply-nested
message (30? levels), but the navigation to "payload" field seems to create a default
value at the top level.
"""
[docs]
def __init__(self, *args: Value, **fields: Value) -> None:
if args and len(args) == 1:
super().__init__(cast(Mapping[Value, Value], args[0]))
elif args and len(args) > 1:
raise TypeError(r"Expected dictionary or fields, not {args!r}")
else:
super().__init__({StringType(k): v for k, v in fields.items()})
# def get(self, field: Any, default: Optional[Value] = None) -> Value:
# """
# Alternative implementation with descent to locate a deeply-buried field.
# It seemed like this was the defined behavior. It turns it, it isn't.
# The code is here in case we're wrong and it really is the defined behavior.
#
# Note. There is no default provision in CEL.
# """
# if field in self:
# return super().get(field)
#
# def descend(message: MessageType, field: Value) -> MessageType:
# if field in message:
# return message
# for k in message.keys():
# found = descend(message[k], field)
# if found is not None:
# return found
# return None
#
# sub_message = descend(self, field)
# if sub_message is None:
# return default
# return sub_message.get(field)
[docs]
class TypeType:
"""
Annotation used to mark protobuf type objects.
We map these to CELTypes so that type name testing works.
"""
type_name_mapping = {
"google.protobuf.Duration": DurationType,
"google.protobuf.Timestamp": TimestampType,
"google.protobuf.Int32Value": IntType,
"google.protobuf.Int64Value": IntType,
"google.protobuf.UInt32Value": UintType,
"google.protobuf.UInt64Value": UintType,
"google.protobuf.FloatValue": DoubleType,
"google.protobuf.DoubleValue": DoubleType,
"google.protobuf.Value": MessageType,
"google.protubuf.Any": MessageType, # Weird.
"google.protobuf.Any": MessageType,
"list_type": ListType,
"map_type": MapType,
"map": MapType,
"list": ListType,
"string": StringType,
"bytes": BytesType,
"bool": BoolType,
"int": IntType,
"uint": UintType,
"double": DoubleType,
"null_type": type(None),
"STRING": StringType,
"BOOL": BoolType,
"INT64": IntType,
"UINT64": UintType,
"INT32": IntType,
"UINT32": UintType,
"BYTES": BytesType,
"DOUBLE": DoubleType,
}
[docs]
def __init__(self, value: Any = "") -> None:
if isinstance(value, str) and value in self.type_name_mapping:
self.type_reference = self.type_name_mapping[value]
elif isinstance(value, str):
try:
self.type_reference = eval(value)
except (NameError, SyntaxError):
raise TypeError(f"Unknown type {value!r}")
else:
self.type_reference = value.__class__
[docs]
def __eq__(self, other: Any) -> bool:
return (
other == self.type_reference or isinstance(other, self.type_reference) # noqa: W503
)