Module acquisition.switch_rules

Classes

class Action (plugin_name, step_name)
Expand source code
class Action(object):

    def __init__(self, plugin_name, step_name):
        self.plugin_name = plugin_name
        self.step_name = step_name

    def __hash__(self):
        return hash((self.__class__.__name__,
                    self.plugin_name, self.step_name))

Subclasses

class AlwaystrueRulesBlock (params=[])
Expand source code
class AlwaystrueRulesBlock(ZeroParameterRulesBlock):

    def eval(self, xaf, rule_pattern):
        LOGGER.debug("alwaystrue switch rule => True")
        return True

Ancestors

Methods

def eval(self, xaf, rule_pattern)
class BadSyntax (*args, **kwargs)

Common base class for all non-exit exceptions.

Expand source code
class BadSyntax(Exception):

    pass

Ancestors

  • builtins.Exception
  • builtins.BaseException
class CopyAction (plugin_name, step_name)
Expand source code
class CopyAction(Action):

    def __str__(self):
        return "copy to %s/%s" % (self.plugin_name, self.step_name)

Ancestors

class EqualRulesBlock (params=[])
Expand source code
class EqualRulesBlock(OneParameterRulesBlock):

    def eval(self, xaf, rule_pattern):
        val = self.get_val_from_xaf(xaf)
        res = (val == rule_pattern)
        LOGGER.debug("(%s == %s) switch rule => %s" %
                     (val, rule_pattern, res))
        return res

Ancestors

Subclasses

Methods

def eval(self, xaf, rule_pattern)
class FnmatchRulesBlock (params=[])
Expand source code
class FnmatchRulesBlock(OneParameterRulesBlock):

    def eval(self, xaf, rule_pattern):
        val = self.get_val_from_xaf(xaf)
        res = fnmatch.fnmatch(val, rule_pattern)
        LOGGER.debug("fnmatch.fnmatch(%s, %s) switch rule => %s" %
                     (val, rule_pattern, res))
        return res

Ancestors

Subclasses

Methods

def eval(self, xaf, rule_pattern)
class HardlinkAction (plugin_name, step_name)
Expand source code
class HardlinkAction(Action):

    def __str__(self):
        return "hardlink to %s/%s" % (self.plugin_name, self.step_name)

Ancestors

class NotequalRulesBlock (params=[])
Expand source code
class NotequalRulesBlock(EqualRulesBlock):

    def eval(self, xaf, rule_pattern):
        parent = EqualRulesBlock.eval(self, xaf, rule_pattern)
        return not parent

Ancestors

Methods

def eval(self, xaf, rule_pattern)
class NotfnmatchRulesBlock (params=[])
Expand source code
class NotfnmatchRulesBlock(FnmatchRulesBlock):

    def eval(self, xaf, rule_pattern):
        parent = FnmatchRulesBlock.eval(self, xaf, rule_pattern)
        return not parent

Ancestors

Methods

def eval(self, xaf, rule_pattern)
class NotregexRulesBlock (params=[])
Expand source code
class NotregexRulesBlock(RegexRulesBlock):

    def eval(self, xaf, rule_pattern):
        parent = RegexRulesBlock.eval(self, xaf, rule_pattern)
        return not parent

Ancestors

Methods

def eval(self, xaf, rule_pattern)
class OneParameterRulesBlock (params=[])
Expand source code
class OneParameterRulesBlock(RulesBlock):

    def __init__(self, params=[]):
        RulesBlock.__init__(self, params)
        if len(params) != 1:
            raise Exception("This rules block must have exactly one argument")

    def get_val_from_xaf(self, xaf):
        return xaf.tags.get(self.params[0], b"{NOT_FOUND}").decode('utf8')

Ancestors

Subclasses

Methods

def get_val_from_xaf(self, xaf)
class PythonRulesBlock (*args, **kwargs)
Expand source code
class PythonRulesBlock(ZeroParameterRulesBlock):

    def __init__(self, *args, **kwargs):
        ZeroParameterRulesBlock.__init__(self, *args, **kwargs)
        self._funcs = {}

    def get_func(self, rule_pattern):
        if rule_pattern not in self._funcs:
            if ":" in rule_pattern:
                sys_path = rule_pattern.split(':')[0]
                func_path = rule_pattern.split(':')[1]
            else:
                sys_path = ""
                func_path = rule_pattern
            func_name = func_path.split('.')[-1]
            module_path = ".".join(func_path.split('.')[0:-1])
            with add_sys_path(sys_path):
                importlib.invalidate_caches()
                spec = importlib.util.spec_from_file_location(
                    "my_custom_function",
                    os.path.join(sys_path, module_path + ".py")
                )
                mod = importlib.util.module_from_spec(spec)
                spec.loader.exec_module(mod)
                func = getattr(mod, func_name)
                self._funcs[rule_pattern] = func
        return self._funcs[rule_pattern]

    def eval(self, xaf, rule_pattern):
        func = self.get_func(rule_pattern)
        before = time.time()
        res = func(xaf)
        after = time.time()
        if (after - before) > 1.0:
            LOGGER.warning("more than 1s passed in %s function", func)
        LOGGER.debug("%s(xaf) switch rule => %s" % (func, res))
        return res

Ancestors

Methods

def eval(self, xaf, rule_pattern)
def get_func(self, rule_pattern)
class RegexRulesBlock (params=[])
Expand source code
class RegexRulesBlock(OneParameterRulesBlock):

    def eval(self, xaf, rule_pattern):
        val = self.get_val_from_xaf(xaf)
        res = re.match(rule_pattern, val)
        LOGGER.debug("re.match(%s, %s) switch rule => %s" %
                     (val, rule_pattern, res))
        return res

Ancestors

Subclasses

Methods

def eval(self, xaf, rule_pattern)
class RulesBlock (params=[])
Expand source code
class RulesBlock(object):

    def __init__(self, params=[]):
        self.params = params
        self.switch_rules = []

    def add_switch_rule(self, switch_rule):
        self.switch_rules.append(switch_rule)

    def eval(self, xaf, rule_pattern):
        raise NotImplementedError()

    def evaluate(self, xaf):
        actions = set()
        for rule in self.switch_rules:
            try:
                res = self.eval(xaf, rule.pattern)
            except Exception:
                LOGGER.exception("exception during rule block %s eval on %s" %
                                 (self, rule.pattern))
            else:
                if res:
                    LOGGER.debug("=> adding actions: %s" %
                                 ", ".join([str(x) for x in rule.actions]))
                    actions = actions.union(rule.actions)
        return (actions, len(self.switch_rules))

    def get_virtual_targets(self):
        targets = set()
        for rule in self.switch_rules:
            for action in rule.actions:
                targets.add((action.plugin_name, action.step_name))
        return targets

Subclasses

Methods

def add_switch_rule(self, switch_rule)
def eval(self, xaf, rule_pattern)
def evaluate(self, xaf)
def get_virtual_targets(self)
class RulesReader
Expand source code
class RulesReader(object):

    def __init__(self):
        self.log = LOGGER
        self._raw_lines = []

    def rules_block_factory(self, rule_type, params=[]):
        c = "%sRulesBlock" % rule_type.capitalize()
        try:
            klass = globals()[c]
            return klass(params)
        except Exception:
            self.log.exception("probably a wrong rules block type: %s" %
                               rule_type)
            raise BadSyntax()

    def action_factory(self, action_type, *args, **kwargs):
        c = "%sAction" % action_type.capitalize()
        try:
            klass = globals()[c]
            return klass(*args, **kwargs)
        except Exception:
            self.log.exception("probably a wrong action type: %s" %
                               action_type)
            raise BadSyntax()

    def read(self, path, section_prefix="switch_rules*"):
        self.log = self.log.bind(path=path)
        result = RulesSet()
        x = OpinionatedConfigParser(delimiters=("=",), comment_prefixes=("#",))
        x.optionxform = str
        x.read([path])
        rules_block = None
        for section in x.sections():
            if fnmatch.fnmatch(section, "%s:*:*" % section_prefix):
                tempo = section.split(':')
                typ = tempo[1]
                params = ':'.join(tempo[2:]).split(',')
                rules_block = self.rules_block_factory(typ, params)
                result.add_rules_block(rules_block)
            elif fnmatch.fnmatch(section, "%s:*" % section_prefix):
                tempo = section.split(':')
                typ = tempo[1]
                rules_block = self.rules_block_factory(typ)
                result.add_rules_block(rules_block)
            else:
                continue
            for option in x.options(section):
                val = x.get(section, option)
                actions = [y.strip() for y in val.split(',')]
                pattern = option.replace('@@@@@@', '=').replace('~~~~~~', '#')
                switch_rule = SwitchRule(pattern)
                for action in actions:
                    tempo2 = action.split('/')
                    if len(tempo2) == 2:
                        if tempo2[1].endswith('*'):
                            act = self.action_factory("hardlink", tempo2[0],
                                                      tempo2[1][:-1])
                        else:
                            act = self.action_factory("copy", tempo2[0],
                                                      tempo2[1])
                    else:
                        self.log.error("bad action [%s] for section [%s] "
                                       "and pattern: %s" % (action, section,
                                                            option))
                        raise BadSyntax()
                    switch_rule.add_action(act)
                rules_block.add_switch_rule(switch_rule)
        return result

Methods

def action_factory(self, action_type, *args, **kwargs)
def read(self, path, section_prefix='switch_rules*')
def rules_block_factory(self, rule_type, params=[])
class RulesSet
Expand source code
class RulesSet(object):

    def __init__(self):
        self.rule_blocks = []

    def add_rules_block(self, rule_block):
        self.rule_blocks.append(rule_block)

    def evaluate(self, xaf):
        actions = set()
        total_rules = 0
        before = time.time()
        for rule_block in self.rule_blocks:
            res, rules_count = rule_block.evaluate(xaf)
            actions = actions.union(res)
            total_rules = total_rules + rules_count
        after = time.time()
        LOGGER.debug("%i rules evaluated in a total of %i ms",
                     total_rules, int(1000.0 * (after - before)))
        return actions

    def get_virtual_targets(self):
        targets = set()
        for rule_block in self.rule_blocks:
            targets = targets.union(rule_block.get_virtual_targets())
        return targets

Methods

def add_rules_block(self, rule_block)
def evaluate(self, xaf)
def get_virtual_targets(self)
class SwitchRule (pattern)
Expand source code
class SwitchRule(object):

    def __init__(self, pattern):
        self.pattern = pattern
        self.actions = []

    def add_action(self, action):
        self.actions.append(action)

Methods

def add_action(self, action)
class ZeroParameterRulesBlock (params=[])
Expand source code
class ZeroParameterRulesBlock(RulesBlock):

    def __init__(self, params=[]):
        RulesBlock.__init__(self, params)
        if len(params) != 0:
            raise Exception("This rules block must have zero argument")

Ancestors

Subclasses

class add_sys_path (path)
Expand source code
class add_sys_path():

    def __init__(self, path):
        self.path = path

    def __enter__(self):
        if self.path is not None and self.path != "":
            sys.path.insert(0, self.path)

    def __exit__(self, exc_type, exc_value, traceback):
        try:
            if self.path is not None and self.path != "":
                sys.path.remove(self.path)
        except ValueError:
            pass