from acquisition.utils import MFMODULE_RUNTIME_HOME
from acquisition.utils import _set_custom_environment
from acquisition.utils import _get_tmp_filepath
from acquisition.utils import _make_config_file_parser_class
from mfutil import get_unique_hexa_identifier
from acquisition.utils import _get_current_utc_datetime_with_ms
import re
import os
import sys
import six
import mflog
import traceback
import configargparse
from functools import partial
class AcquisitionBase(object):
"""Abstract class to describe an acquisition base.
You have to override this class.
Attributes:
args (Namespace): argparser Namespace object with parsed cli args.
unit_tests (boolean): if True, we are in unit tests mode.
unit_tests_args (string): cli args (for unit tests).
__logger (Logger): Logger object.
plugin_name (string): the name of the plugin.
step_name (string): the name of the step (if you inherits from
AcquisitionStep).
daemon_name (string): the name of the daemon (if you inherits from
AcquisitionListener).
"""
args = None
unit_tests = False
unit_tests_args = None
__logger = None
plugin_name = None
step_name = "main" # default value
daemon_name = None
def __init__(self):
"""Constructor."""
if self.plugin_name is None:
raise NotImplementedError("plugin_name property must be overriden "
"and set")
step_or_daemon_name = self._get_step_or_daemon_name()
plugin_name = self.plugin_name
regexp = "^[A-Za-z0-9_]+$"
if not re.match(regexp, step_or_daemon_name):
self.error_and_die(
"step_name or daemon_name: %s must match with %s",
step_or_daemon_name,
regexp,
)
if not re.match(regexp, plugin_name):
self.error_and_die(
"plugin_name: %s must match with %s", plugin_name, regexp
)
_set_custom_environment(plugin_name, step_or_daemon_name)
def _init(self):
description = "%s/%s acquisition %s" % (
self.plugin_name,
self._get_step_or_daemon_name(),
self.__class__.__name__,
)
parser = configargparse.ArgumentParser(
description=description,
add_env_var_help=False,
ignore_unknown_config_file_keys=True,
args_for_setting_config_path=["-c", "--config-file"],
config_file_parser_class=partial(
_make_config_file_parser_class,
self.plugin_name,
self._get_step_or_daemon_name(),
),
)
self._add_extra_arguments_before(parser)
self.add_extra_arguments(parser)
self._add_extra_arguments_after(parser)
if self.unit_tests and self.unit_tests_args:
self.args, unknown = parser.parse_known_args(self.unit_tests_args)
else:
self.args, unknown = parser.parse_known_args()
return self.init()
def init(self):
"""Init what you want.
Called after CLI parsing but before processing any files.
"""
pass
def _destroy(self):
return self.destroy()
def destroy(self):
"""Destroy what you want just before exiting.
No file will be processed after calling this method.
"""
pass
def _add_extra_arguments_before(self, parser):
pass
def add_extra_arguments(self, parser):
pass
def _add_extra_arguments_after(self, parser):
pass
def _get_step_or_daemon_name(self):
try:
if self.daemon_name is not None:
return self.daemon_name
except Exception:
pass
try:
if self.step_name is not None:
return self.step_name
except Exception:
pass
return "main"
def get_plugin_directory_path(self):
"""Return the plugin directory (fullpath).
Returns:
(string) the fullpath of the plugin directory.
"""
return os.path.join(
MFMODULE_RUNTIME_HOME, "var", "plugins", self.plugin_name
)
def get_tmp_filepath(self):
"""Get a full temporary filepath (including unique filename).
Returns:
(string) full temporary filepath (including unique filename).
"""
return _get_tmp_filepath(
self.plugin_name, self._get_step_or_daemon_name()
)
def _get_logger(self):
"""Get a logger."""
if not self.__logger:
logger_name = "mfdata.%s.%s" % (
self.plugin_name,
self._get_step_or_daemon_name(),
)
self.__logger = mflog.getLogger(logger_name)
return self.__logger
def error_and_die(self, msg, *args, **kwargs):
"""Log an error message and exit immediatly."""
self.error(msg, *args, **kwargs)
sys.stderr.write("exiting...\n")
sys.stderr.write("stack: %s\n" % "".join(traceback.format_stack()))
os._exit(2)
def warning(self, msg, *args, **kwargs):
"""Log a warning message."""
logger = self._get_logger()
logger.warning(msg, *args, **kwargs)
def debug(self, msg, *args, **kwargs):
"""Log a debug message."""
logger = self._get_logger()
logger.debug(msg, *args, **kwargs)
def info(self, msg, *args, **kwargs):
"""Log an info message."""
logger = self._get_logger()
logger.info(msg, *args, **kwargs)
def critical(self, msg, *args, **kwargs):
"""Log a critical message."""
logger = self._get_logger()
logger.critical(msg, *args, **kwargs)
def error(self, msg, *args, **kwargs):
"""Log an error message."""
logger = self._get_logger()
logger.error(msg, *args, **kwargs)
def exception(self, msg, *args, **kwargs):
"""Log an error message with current exception stacktrace.
This method should only be called from an exception handler.
"""
logger = self._get_logger()
logger.exception(str(msg), *args, **kwargs)
def _get_tag_name(
self,
name,
counter_str_value="latest",
force_process_name=None,
force_plugin_name=None,
):
plugin_name = self.plugin_name
process_name = self._get_step_or_daemon_name()
if force_process_name is not None:
process_name = force_process_name
if force_plugin_name is not None:
plugin_name = force_plugin_name
if plugin_name == "core":
return "%s.%s.%s" % (counter_str_value, plugin_name, name)
else:
return "%s.%s.%s.%s" % (
counter_str_value,
plugin_name,
process_name,
name,
)
def _set_tag_latest(self, xaf, name, value):
tag_name = self._get_tag_name(name, "latest")
self._set_tag(xaf, tag_name, value)
def _set_tag(self, xaf, name, value):
self.debug("Setting tag %s = %s" % (name, value))
xaf.tags[name] = value
def set_tag(self, xaf, name, value, add_latest=True):
"""Set a tag on a file with good prefixes.
Args:
xaf (XattrFile): file to add/set tag on.
name (string): name of the tag (without prefixes)
value (string): value of the tag
add_latest (boolean): add latest prefix
"""
counter_str_value = str(self._get_counter_tag_value(xaf))
tag_name = self._get_tag_name(name, counter_str_value)
self._set_tag(xaf, tag_name, value)
if add_latest:
self._set_tag_latest(xaf, name, value)
def get_tag(
self,
xaf,
name,
not_found_value=None,
counter_str_value="latest",
force_process_name=None,
force_plugin_name=None,
**kwargs
):
"""Read a tag on a file with good prefixes.
Args:
xaf (XattrFile): file to read.
name (string): name of the tag (without prefixes).
not_found_value: returned value if the tag is not found.
counter_str_value (string): counter string value.
force_process_name: tagger name (if None, current
process_name is taken)
force_plugin_name: tagger plugin name (if None, current
plugin name is taken)
kwargs: for compatibility with force_step_name
"""
if "force_step_name" in kwargs.keys():
force_process_name = kwargs["force_step_name"]
tag_name = self._get_tag_name(
name, counter_str_value, force_process_name, force_plugin_name
)
return xaf.tags.get(tag_name, not_found_value)
def _get_counter_tag_value(self, xaf, not_found_value="0"):
tag_name = self._get_tag_name("step_counter", force_plugin_name="core")
return int(xaf.tags.get(tag_name, not_found_value))
def __increment_and_set_counter_tag_value(self, xaf):
tag_name = self._get_tag_name("step_counter", force_plugin_name="core")
counter_value = self._get_counter_tag_value(xaf, not_found_value="-1")
value = six.b("%i" % (counter_value + 1))
self._set_tag(xaf, tag_name, value)
def _get_original_basename_tag_name(self):
return self._get_tag_name(
"original_basename",
force_plugin_name="core",
counter_str_value="first",
)
def _get_original_uid_tag_name(self):
return self._get_tag_name(
"original_uid", force_plugin_name="core", counter_str_value="first"
)
def _get_original_dirname_tag_name(self):
return self._get_tag_name(
"original_dirname",
force_plugin_name="core",
counter_str_value="first",
)
def __set_original_basename_if_necessary(self, xaf):
if hasattr(xaf, "_original_filepath") and xaf._original_filepath:
tag_name = self._get_original_basename_tag_name()
if tag_name not in xaf.tags:
original_basename = str(
os.path.basename(xaf._original_filepath)
)
self._set_tag(xaf, tag_name, original_basename)
def __set_original_uid_if_necessary(self, xaf):
tag_name = self._get_original_uid_tag_name()
if tag_name not in xaf.tags:
original_uid = get_unique_hexa_identifier()
self._set_tag(xaf, tag_name, original_uid)
def __set_original_dirname_if_necessary(self, xaf):
if hasattr(xaf, "_original_filepath") and xaf._original_filepath:
tag_name = self._get_original_dirname_tag_name()
if tag_name not in xaf.tags:
dirname = os.path.dirname(xaf._original_filepath)
original_dirname = str(os.path.basename(dirname))
self._set_tag(xaf, tag_name, original_dirname)
def _set_before_tags(self, xaf):
current = _get_current_utc_datetime_with_ms()
self.__increment_and_set_counter_tag_value(xaf)
self.set_tag(xaf, "enter_step", current, add_latest=False)
self.__set_original_basename_if_necessary(xaf)
self.__set_original_uid_if_necessary(xaf)
self.__set_original_dirname_if_necessary(xaf)