#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# rule_engine/engine.py
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of the project nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
import datetime
import functools
import math
from . import ast
from . import errors
from . import parser
import dateutil.tz
[docs]def resolve_attribute(thing, name):
"""
A replacement resolver function for looking up symbols as members of
*thing*. This is effectively the same as ``thing.name``. The *thing* object
can be a :py:class:`~collections.namedtuple`, a custom Python class or any
other object. Each of the members of *thing* must be of a compatible data
type.
.. warning::
This effectively exposes all members of *thing*. If any members are
sensitive, then a custom resolver should be used that checks *name*
against a whitelist of attributes that are allowed to be accessed.
:param thing: The object on which the *name* attribute will be accessed.
:param str name: The symbol name that is being resolved.
:return: The value for the corresponding attribute *name*.
"""
for name_part in name.split('.'):
if not hasattr(thing, name_part):
raise errors.SymbolResolutionError(name_part)
thing = getattr(thing, name_part)
return thing
[docs]def resolve_item(thing, name):
"""
A resolver function for looking up symbols as items from an object (*thing*)
which supports the :py:class:`~collections.abc.Mapping` interface, such as a
dictionary. This is effectively the same as ``thing['name']``. Each of the
values in *thing* must be of a compatible data type.
:param thing: The object from which the *name* item will be accessed.
:param str name: The symbol name that is being resolved.
:return: The value for the corresponding attribute *name*.
"""
if name not in thing:
raise errors.SymbolResolutionError(name)
return thing[name]
[docs]def to_default_resolver(resolver, default_value=None):
"""
Convert the specified *resolver* function (such as
:py:func:`~.resolve_attribute` or :py:func:`~.resolve_item`) into one which
returns the specified *default_value* when the symbol fails to resolve.
Converted resolver functions will not raise
:py:exc:`~errors.SymbolResolutionError`, instead they will return the
default value.
.. versionadded:: 1.1.0
:param resolver: The resolver function to convert.
:type resolver: function
:param default_value: The Python value to use as the default for symbols
which can not be resolved.
:return: A new resolver function.
:rtype: function
"""
# use DataType.from_value to raise a TypeError if value is not of a
# compatible data type
ast.DataType.from_value(default_value)
@functools.wraps(resolver)
def default_resolver(thing, name):
try:
return resolver(thing, name)
except errors.SymbolResolutionError:
return default_value
return default_resolver
[docs]def to_recursive_resolver(resolver):
"""
Convert the specified *resolver* function (such as
:py:func:`~.resolve_attribute` or :py:func:`~.resolve_item`) into one which
splits the symbol name on dots and recursively resolves each one on the
specified thing parameter.
.. versionadded:: 1.1.0
:param resolver: The resolver function to convert.
:type resolver: function
:return: A new resolver function.
:rtype: function
"""
split_on = '.'
@functools.wraps(resolver)
def recursive_resolver(thing, name):
parts = name.split(split_on)
for idx, part in enumerate(parts):
try:
thing = resolver(thing, part)
except errors.SymbolResolutionError as error:
symbol_name = split_on.join(parts[:idx + 1])
raise errors.SymbolResolutionError(symbol_name, symbol_scope=error.symbol_scope) from None
return thing
return recursive_resolver
def _type_resolver(type_map, name):
if name not in type_map:
raise errors.SymbolResolutionError(name)
return type_map[name]
[docs]def type_resolver_from_dict(dictionary):
"""
Return a function suitable for use as the *type_resolver* for a
:py:class:`.Context` instance from a dictionary. If any of the values within
the dictionary are not of a compatible data type, a :py:exc:`TypeError` will
be raised. Additionally, the resulting function will raise a
:py:exc:`~rule_engine.errors.SymbolResolutionError` if the symbol name does
not exist within the dictionary.
:param dict dictionary: A dictionary (or any other object which supports the
:py:class:`~collections.abc.Mapping` interface) from which to create the
callback function.
:return: The callback function.
:rtype: function
"""
type_map = {key: value if isinstance(value, ast.DataType) else ast.DataType.from_value(value) for key, value in dictionary.items()}
return functools.partial(_type_resolver, type_map)
[docs]class Context(object):
"""
An object defining the context for a rule's evaluation. This can be used to
change the behavior of certain aspects of the rule such as how symbols are
resolved and what regex flags should be used.
"""
[docs] def __init__(self, regex_flags=0, resolver=None, type_resolver=None, default_timezone='local'):
"""
:param int regex_flags: The flags to provide to functions in the
:py:mod:`re` module.
:param resolver: An optional callback function to use in place of
:py:meth:`.resolve`.
:param type_resolver: An optional callback function to use in place of
:py:meth:`.resolve_type`.
:param default_timezone: The default timezone to apply to
:py:class:`~datetime.datetime` instances which do not have one
specified. This is necessary for comparison operations. The value
should either be a :py:class:`~datetime.tzinfo` instance, or a
string. If *default_timzezone* is a string it must be one of the
specially supported (case-insensitive) values of "local" or "utc".
:type default_timezone: str, :py:class:`~datetime.tzinfo`
"""
self.regex_flags = regex_flags
"""
The flags to provide to the :py:func:`~re.match` and
:py:func:`~re.search` functions when matching or searching for patterns.
"""
self.symbols = set()
"""
The symbols that are referred to by the rule. Some or all of these will
need to be resolved at evaluation time. This attribute can be used after
a rule is generated to ensure that all symbols are valid before it is
evaluated.
"""
if isinstance(default_timezone, str):
default_timezone = default_timezone.lower()
if default_timezone == 'local':
default_timezone = dateutil.tz.tzlocal()
elif default_timezone == 'utc':
default_timezone = dateutil.tz.tzutc()
else:
raise ValueError('unsupported timezone: ' + default_timezone)
elif not isinstance(default_timezone, datetime.tzinfo):
raise TypeError('invalid default_timezone type')
self.default_timezone = default_timezone
self.__type_resolver = type_resolver or (lambda _: ast.DataType.UNDEFINED)
self.__resolver = resolver or resolve_item
[docs] def resolve(self, thing, name, scope=None):
"""
The method to use for resolving symbols names to values. This function
must return a compatible value for the specified symbol name. When a
*scope* is defined, this function handles the resolution itself, however
when the *scope* is ``None`` the resolver specified in
:py:meth:`~.Context.__init__` is used which defaults to
:py:func:`resolve_item`. This function must return a compatible value
for the specified symbol name.
:param thing: The object from which the *name* item will be accessed.
:param str name: The symbol name that is being resolved.
:return: The value for the corresponding attribute *name*.
"""
if scope is None:
return self.__resolver(thing, name)
if scope == 'built-in':
if name == 'f.e':
return math.e
elif name == 'f.pi':
return math.pi
elif name == 'd.now':
return datetime.datetime.now()
elif name == 'd.today':
return datetime.date.today()
raise errors.SymbolResolutionError(name, symbol_scope=scope)
[docs] def resolve_type(self, name):
"""
A method for providing type hints while the rule is being generated.
This can be used to ensure that all symbol names are valid and that the
types are appropriate for the operations being performed. It must then
return one of the compatible data type constants if the symbol is valid
or raise an exception. The default behavior is to return
:py:data:`~rule_engine.ast.DataType.UNDEFINED` for all symbols.
:param str name: The symbol name to provide a type hint for.
:return: The type of the specified symbol
"""
return self.__type_resolver(name)
[docs]class Rule(object):
"""
A rule which parses a string with a logical expression and can then evaluate
an arbitrary object for whether or not it matches based on the constraints
of the expression.
"""
parser = parser.Parser()
"""
The :py:class:`~rule_engine.parser.Parser` instance that will be used for
parsing the rule text into a compatible abstract syntax tree (AST) for
evaluation.
"""
[docs] def __init__(self, text, context=None):
"""
:param str text: The text of the logical expression.
:param context: The context to use for evaluating the expression on
arbitrary objects. This can be used to change the default behavior.
The default context is :py:class:`.Context` but any object providing
the same interface (such as a subclass) can be used.
:type context: :py:class:`.Context`
"""
context = context or Context()
self.text = text
self.context = context
self.statement = self.parser.parse(text, context)
def __repr__(self):
return "<{0} text={1!r} >".format(self.__class__.__name__, self.text)
def __str__(self):
return self.text
[docs] def filter(self, things):
"""
A convenience function for iterating over *things* and yielding each
member that :py:meth:`.matches` return True for.
:param things: The collection of objects to iterate over.
"""
yield from (thing for thing in things if self.matches(thing))
[docs] @classmethod
def is_valid(cls, text, context=None):
"""
Test whether or not the rule is syntactically correct. This verifies the
grammar is well structured and that there are no type compatibility
issues regarding literals or symbols with known types (see
:py:meth:`~.Context.resolve_type` for specifying symbol type
information).
:param str text: The text of the logical expression.
:param context: The context as would be passed to the
:py:meth:`.__init__` method. This can be used for specifying symbol
type information.
:return: Whether or not the expression is well formed and appears valid.
:rtype: bool
"""
try:
cls.parser.parse(text, (context or Context()))
except errors.EngineError:
return False
return True
[docs] def evaluate(self, thing):
"""
Evaluate the rule against the specified *thing* and return the value.
This can be used to, for example, apply the symbol resolver.
:param thing: The object on which to apply the rule.
:return: The value the rule evaluates to. Unlike the :py:meth:`.matches`
method, this is not necessarily a boolean.
"""
return self.statement.evaluate(thing)
[docs] def matches(self, thing):
"""
Evaluate the rule against the specified *thing*. This will either return
whether *thing* matches, or an exception will be raised.
:param thing: The object on which to apply the rule.
:return: Whether or not the rule matches.
:rtype: bool
"""
return bool(self.statement.evaluate(thing))
[docs] def to_graphviz(self):
"""
Generate a diagram of the parsed rule's AST using GraphViz.
:return: The rule diagram.
:rtype: :py:class:`graphviz.Digraph`
"""
import graphviz
digraph = graphviz.Digraph(comment=self.text)
self.statement.to_graphviz(digraph)
return digraph
class DebugRule(Rule):
parser = None
def __init__(self, *args, **kwargs):
self.parser = parser.Parser(debug=True)
super(DebugRule, self).__init__(*args, **kwargs)
def matches(self, thing):
return self.statement.evaluate(thing)