Module acquisition.base
Classes
class AcquisitionBase
-
Abstract class to describe an acquisition base.
You have to override this class.
Attributes
unit_tests
:boolean
- if True, we are in unit tests mode.
unit_tests_args
:string
- cli args (for unit tests).
Constructor.
Expand source code
class AcquisitionBase(object): """Abstract class to describe an acquisition base. You have to override this class. Attributes: unit_tests (boolean): if True, we are in unit tests mode. unit_tests_args (string): cli args (for unit tests). """ unit_tests = False unit_tests_args = None def __init__(self): """Constructor.""" self.plugin_name = os.environ.get("MFDATA_CURRENT_PLUGIN_NAME", None) if self.plugin_name is None: if self.unit_tests: self.plugin_name = "unittests" else: raise Exception("you have to execute this inside a plugin_env") validate_plugin_name(self.plugin_name) self.args = None self.__logger = None def _init(self): description = "%s/%s acquisition step" % ( self.plugin_name, self.__class__.__name__, ) parser = argparse.ArgumentParser(description=description) parser.add_argument("--step-name", action="store", default="main", help="step name") self._add_extra_arguments_before(parser) self.add_extra_arguments(parser) self._add_extra_arguments_after(parser) if self.unit_tests and self.unit_tests_args: self.args, unknown = parser.parse_known_args(self.unit_tests_args) else: self.args, unknown = parser.parse_known_args() self.step_name = self.args.step_name regexp = "^[A-Za-z0-9_]+$" if not re.match(regexp, self.step_name): self.error_and_die( "step_name: %s must match with %s", self.step_name, regexp, ) return self.init() def init(self): """Init what you want. Called after CLI parsing but before processing any files. """ pass def _destroy(self): return self.destroy() def destroy(self): """Destroy what you want just before exiting. No file will be processed after calling this method. """ pass def _add_extra_arguments_before(self, parser): pass def add_extra_arguments(self, parser): pass def _add_extra_arguments_after(self, parser): pass def get_plugin_directory_path(self): """Return the plugin directory (fullpath). Returns: (string) the fullpath of the plugin directory. """ return os.path.join( MFMODULE_RUNTIME_HOME, "var", "plugins", self.plugin_name ) def get_tmp_filepath(self, forced_basename=None): """Get a full temporary filepath (including unique filename). Args: forced_basename (string): if not None, use the given string as basename. If None, the basename is a random identifier. Returns: (string) full temporary filepath (including unique filename). """ return _get_tmp_filepath( self.plugin_name, self.step_name, forced_basename=forced_basename ) def _get_logger(self): """Get a logger.""" if not self.__logger: logger_name = "mfdata.%s.%s" % ( self.plugin_name, self.step_name if self.step_name is not None else "notset", ) self.__logger = mflog.getLogger(logger_name) return self.__logger def error_and_die(self, msg, *args, **kwargs): """Log an error message and exit immediatly.""" self.error(msg, *args, **kwargs) sys.stderr.write("exiting...\n") sys.stderr.write("stack: %s\n" % "".join(traceback.format_stack())) os._exit(2) def warning(self, msg, *args, **kwargs): """Log a warning message.""" logger = self._get_logger() logger.warning(msg, *args, **kwargs) def debug(self, msg, *args, **kwargs): """Log a debug message.""" logger = self._get_logger() logger.debug(msg, *args, **kwargs) def info(self, msg, *args, **kwargs): """Log an info message.""" logger = self._get_logger() logger.info(msg, *args, **kwargs) def critical(self, msg, *args, **kwargs): """Log a critical message.""" logger = self._get_logger() logger.critical(msg, *args, **kwargs) def error(self, msg, *args, **kwargs): """Log an error message.""" logger = self._get_logger() logger.error(msg, *args, **kwargs) def exception(self, msg, *args, **kwargs): """Log an error message with current exception stacktrace. This method should only be called from an exception handler. """ logger = self._get_logger() logger.exception(str(msg), *args, **kwargs) def __get_config_value(self, section, key, transform=None, default=None): env_var = "%s_CURRENT_PLUGIN_%s_%s" % \ (MFMODULE, section.replace('-', '_').upper(), key.replace('-', '_').upper()) val = os.environ.get(env_var, default) if isinstance(val, Exception): # pylint: disable=E0702 raise val if transform is not None: try: val = transform(val) except Exception as e: raise Exception("can't call transform function on " "configuration key: [%s]/%s with " "exception: %s" % (section, key, e)) return val def get_config_value(self, key, transform=None, default=Exception()): return self.__get_config_value("step_%s" % self.step_name, key, transform=transform, default=default) def get_custom_config_value(self, key, transform=None, default=Exception()): return self.__get_config_value("custom", key, transform=transform, default=default) def _get_tag_name( self, name, counter_str_value="latest", force_process_name=None, force_plugin_name=None, ): plugin_name = self.plugin_name process_name = self.step_name if force_process_name is not None: process_name = force_process_name if force_plugin_name is not None: plugin_name = force_plugin_name if plugin_name == "core": return "%s.%s.%s" % (counter_str_value, plugin_name, name) else: return "%s.%s.%s.%s" % ( counter_str_value, plugin_name, process_name, name, ) def _set_tag_latest(self, xaf, name, value, info=False): tag_name = self._get_tag_name(name, "latest") self._set_tag(xaf, tag_name, value, info=info) def _set_tag(self, xaf, name, value, info=False): log = self.info if info else self.debug log("Setting tag %s = %s" % (name, value)) xaf.tags[name] = value def set_tag(self, xaf, name, value, add_latest=True, info=False): """Set a tag on a file with good prefixes. Args: xaf (XattrFile): file to add/set tag on. name (string): name of the tag (without prefixes) value (string): value of the tag add_latest (boolean): add latest prefix info (boolean): if True, a log info message is send (else debug) """ counter_str_value = str(self._get_counter_tag_value(xaf)) tag_name = self._get_tag_name(name, counter_str_value) self._set_tag(xaf, tag_name, value, info=info) if add_latest: self._set_tag_latest(xaf, name, value, info=info) def get_tag( self, xaf, name, not_found_value=None, counter_str_value="latest", force_process_name=None, force_plugin_name=None, **kwargs ): """Read a tag on a file with good prefixes. Args: xaf (XattrFile): file to read. name (string): name of the tag (without prefixes). not_found_value: returned value if the tag is not found. counter_str_value (string): counter string value. force_process_name: tagger name (if None, current process_name is taken) force_plugin_name: tagger plugin name (if None, current plugin name is taken) kwargs: for compatibility with force_step_name """ if "force_step_name" in kwargs.keys(): force_process_name = kwargs["force_step_name"] tag_name = self._get_tag_name( name, counter_str_value, force_process_name, force_plugin_name ) return xaf.tags.get(tag_name, not_found_value) def _get_counter_tag_value(self, xaf, not_found_value="0"): tag_name = self._get_tag_name("step_counter", force_plugin_name="core") return int(xaf.tags.get(tag_name, not_found_value)) def __increment_and_set_counter_tag_value(self, xaf): tag_name = self._get_tag_name("step_counter", force_plugin_name="core") counter_value = self._get_counter_tag_value(xaf, not_found_value="-1") value = six.b("%i" % (counter_value + 1)) self._set_tag(xaf, tag_name, value) def _get_original_basename_tag_name(self): return self._get_tag_name( "original_basename", force_plugin_name="core", counter_str_value="first", ) def _get_original_uid_tag_name(self): return self._get_tag_name( "original_uid", force_plugin_name="core", counter_str_value="first" ) def _get_original_dirname_tag_name(self): return self._get_tag_name( "original_dirname", force_plugin_name="core", counter_str_value="first", ) def __set_original_basename_if_necessary(self, xaf): if hasattr(xaf, "_original_filepath") and xaf._original_filepath: tag_name = self._get_original_basename_tag_name() if tag_name not in xaf.tags: original_basename = str( os.path.basename(xaf._original_filepath) ) self._set_tag(xaf, tag_name, original_basename, info=True) def _set_original_uid_if_necessary(self, xaf, forced_uid=None): tag_name = self._get_original_uid_tag_name() if tag_name not in xaf.tags: if forced_uid is None: original_uid = get_unique_hexa_identifier() else: original_uid = forced_uid self._set_tag(xaf, tag_name, original_uid, info=True) def __set_original_dirname_if_necessary(self, xaf): if hasattr(xaf, "_original_filepath") and xaf._original_filepath: tag_name = self._get_original_dirname_tag_name() if tag_name not in xaf.tags: dirname = os.path.dirname(xaf._original_filepath) original_dirname = str(os.path.basename(dirname)) add_trace(xaf, "<%s" % original_dirname, "", self.plugin_name, self.step_name) self._set_tag(xaf, tag_name, original_dirname, info=True) def _set_before_tags(self, xaf): current = _get_current_utc_datetime_with_ms() self.__increment_and_set_counter_tag_value(xaf) self.set_tag(xaf, "enter_step", current, add_latest=False, info=False) self.__set_original_basename_if_necessary(xaf) self._set_original_uid_if_necessary(xaf) self.__set_original_dirname_if_necessary(xaf) def add_trace(self, xaf, to_plugin_name, to_step_name=""): add_trace(xaf, self.plugin_name, self.step_name, to_plugin_name, to_step_name) def add_virtual_trace(self, to_plugin_name, to_step_name=""): add_virtual_trace(self.plugin_name, self.step_name, to_plugin_name, to_step_name)
Subclasses
Class variables
var unit_tests
var unit_tests_args
Methods
def add_extra_arguments(self, parser)
def add_trace(self, xaf, to_plugin_name, to_step_name='')
def add_virtual_trace(self, to_plugin_name, to_step_name='')
def critical(self, msg, *args, **kwargs)
-
Log a critical message.
def debug(self, msg, *args, **kwargs)
-
Log a debug message.
def destroy(self)
-
Destroy what you want just before exiting.
No file will be processed after calling this method.
def error(self, msg, *args, **kwargs)
-
Log an error message.
def error_and_die(self, msg, *args, **kwargs)
-
Log an error message and exit immediatly.
def exception(self, msg, *args, **kwargs)
-
Log an error message with current exception stacktrace.
This method should only be called from an exception handler.
def get_config_value(self, key, transform=None, default=Exception())
def get_custom_config_value(self, key, transform=None, default=Exception())
def get_plugin_directory_path(self)
-
Return the plugin directory (fullpath).
Returns
(string) the fullpath of the plugin directory.
def get_tag(self, xaf, name, not_found_value=None, counter_str_value='latest', force_process_name=None, force_plugin_name=None, **kwargs)
-
Read a tag on a file with good prefixes.
Args
xaf
:XattrFile
- file to read.
name
:string
- name of the tag (without prefixes).
not_found_value
- returned value if the tag is not found.
counter_str_value
:string
- counter string value.
force_process_name
- tagger name (if None, current process_name is taken)
force_plugin_name
- tagger plugin name (if None, current plugin name is taken)
kwargs
- for compatibility with force_step_name
def get_tmp_filepath(self, forced_basename=None)
-
Get a full temporary filepath (including unique filename).
Args
forced_basename
:string
- if not None, use the given string as basename. If None, the basename is a random identifier.
Returns
(string) full temporary filepath (including unique filename).
def info(self, msg, *args, **kwargs)
-
Log an info message.
def init(self)
-
Init what you want.
Called after CLI parsing but before processing any files.
def set_tag(self, xaf, name, value, add_latest=True, info=False)
-
Set a tag on a file with good prefixes.
Args
xaf
:XattrFile
- file to add/set tag on.
name
:string
- name of the tag (without prefixes)
value
:string
- value of the tag
add_latest
:boolean
- add latest prefix
info
:boolean
- if True, a log info message is send (else debug)
def warning(self, msg, *args, **kwargs)
-
Log a warning message.