Package acquisition
Sub-modules
acquisition.archive_step
acquisition.base
acquisition.batch_step
acquisition.copy_step
acquisition.decorators
acquisition.delete_step
acquisition.fork_step
acquisition.inject_file
acquisition.listener
acquisition.move_step
acquisition.reinject_step
acquisition.stats
acquisition.step
acquisition.switch_rules
acquisition.switch_step
acquisition.transform_step
acquisition.unbzip2_step
acquisition.uncompress_step
acquisition.ungzip_step
acquisition.utils
Classes
class AcquisitionArchiveStep
-
Class to describe an archive acquisition step.
Constructor.
Expand source code
class AcquisitionArchiveStep(AcquisitionMoveStep): """ Class to describe an archive acquisition step. """ def add_extra_arguments(self, parser): self.add_dest_arguments(parser) self.add_force_chmod_argument(parser) parser.add_argument( '--keep-tags-suffix', action='store', default=".tags", help='suffix to add to the basename to keep tags' ' (put a null or empty value to have no tags at all') def _init(self): AcquisitionMoveStep._init(self) if not self.args.dest_dir.startswith('/'): raise Exception("dest-dir must be an absolute directory") self.drop_tags = True self.keep_tags_suffix = self.args.keep_tags_suffix.strip() def before_move(self, xaf): basename = self.compute_basename(xaf) filepath = os.path.join(self.dest_dir, basename) directory = os.path.dirname(filepath) if not mkdir_p(directory): return False if self.keep_tags_suffix not in ("", "null"): tags_filepath = filepath + self.keep_tags_suffix xaf.write_tags_in_a_file(tags_filepath) return filepath
Ancestors
Methods
def before_move(self, xaf)
Inherited members
class AcquisitionBatchStep
-
Abstract class to describe a batch acquisition step.
You have to override this class.
Constructor.
Expand source code
class AcquisitionBatchStep(AcquisitionStep): """Abstract class to describe a batch acquisition step. You have to override this class. """ __batch = None def __init__(self): super(AcquisitionBatchStep, self).__init__() @property def batch_process_max_size(self): """Return the max size of a batch in batch process mode. If not overriden, the default value is 100. """ return 100 @property def batch_process_max_wait(self): """Max wait (in seconds) to fill the batch in batch process mode. If not overriden, the default value is 10 (so 10 seconds). """ return 10 @property def _batch(self): # Lazy init of AcquisitionBatch (to be able to have dynamic # batch_process_max_size/batch_process_max_wait) if self.__batch is None: self.__batch = AcquisitionBatch(self.batch_process_max_size, self.batch_process_max_wait) return self.__batch def _reinit_batch(self): """Reinit the current batch.""" self.__batch = None def process(self, xaf): raise NotImplementedError("process() method should not be called in " "batch process mode") def batch_process(self, xafs): """Process a batch of files. Process several XattrFile (between 1 and max_batch_size files in one call). You have to override this method. File are moved into a temporary directory before the call with unique filenames. Extended attributes are copied to them. So you can do what you want with them. If the method returns True: - we considerer that the processing is ok - all files are deleted if necessary If the method returns False: - we considerer that the result is False for each file - so each file follow the failure policy If the method returns an array of booleans (of the same size than the xafs array), we consider that the processing status for each file. Args: xafs (list): XattrFile objects (list of files to process). Returns: Processing status (True, False or array of booleans). """ raise NotImplementedError("batch_process() method must be overriden " "in \"batch process mode\"") def _destroy(self): try: if self._batch.get_size(): # we have a last batch to process # let's process it by force self._conditional_process_batch(force=True) except TypeError: pass super(AcquisitionBatchStep, self)._destroy() def _ping(self, *args, **kwargs): self._conditional_process_batch() super(AcquisitionBatchStep, self)._ping(*args, **kwargs) def _is_batch_ready(self, force=False): if force and self._batch.get_size() > 0: return True return self._batch.is_ready() def _before(self, xaf): status = super(AcquisitionBatchStep, self)._before(xaf) if not status: return False self._batch.append(xaf) self.set_tag(xaf, "batch_id", self._batch.id, add_latest=False) self.info("File %s added in batch %s", xaf._original_filepath, self._batch.id) return False def _conditional_process_batch(self, force=False): if self._is_batch_ready(force or self._debug_mode): self.debug("Batch %s is ready (size: %i, age: %s seconds) => " "let's process it", self._batch.id, self._batch.get_size(), self._batch.get_age()) self._process_batch() self._reinit_batch() else: if self._batch.get_size() > 0: self.debug("Batch %s is not ready (size: %i, age: %s seconds)", self._batch.id, self._batch.get_size(), self._batch.get_age()) def _process(self, xaf): res = super(AcquisitionBatchStep, self)._process(xaf) self._conditional_process_batch() return res def _process_batch(self): self.info("Start the processing of batch %s...", self._batch.id) timer = self.get_stats_client().timer("processing_batch_timer") timer.start() xafs = self._batch.get_xafs() size = sum([x.getsize() for x in xafs]) process_status = \ self._exception_safe_call(self.batch_process, [xafs], {}, "batch_process", False) after_status = self._after_batch(xafs, process_status) self.get_stats_client().incr("number_of_processed_files", len(xafs)) self.get_stats_client().incr("bytes_of_processed_files", size) self.get_stats_client().incr("number_of_processed_batches", 1) timer.stop() self.info("End of the processing of batch %s...", self._batch.id) if not after_status: self.get_stats_client().incr("number_of_processed_" "batches_error", 1) self.warning("Bad processing status for batch: %s", self._batch.id) def _after_batch(self, xafs, process_status): if process_status is None: process_status = False if isinstance(process_status, bool): for xaf in xafs: super(AcquisitionBatchStep, self)._after(xaf, process_status) return process_status else: if len(process_status) != len(xafs): self.warning("bad process status len(process_status) = %i " "which is different than len(xafs) = %i" % (len(process_status), len(xafs))) super(AcquisitionBatchStep, self)._after(xaf, False) return False for (xaf, pstatus) in zip(xafs, process_status): super(AcquisitionBatchStep, self)._after(xaf, pstatus) return min(process_status)
Ancestors
Instance variables
prop batch_process_max_size
-
Return the max size of a batch in batch process mode.
If not overriden, the default value is 100.
Expand source code
@property def batch_process_max_size(self): """Return the max size of a batch in batch process mode. If not overriden, the default value is 100. """ return 100
prop batch_process_max_wait
-
Max wait (in seconds) to fill the batch in batch process mode.
If not overriden, the default value is 10 (so 10 seconds).
Expand source code
@property def batch_process_max_wait(self): """Max wait (in seconds) to fill the batch in batch process mode. If not overriden, the default value is 10 (so 10 seconds). """ return 10
Methods
def batch_process(self, xafs)
-
Process a batch of files.
Process several XattrFile (between 1 and max_batch_size files in one call). You have to override this method.
File are moved into a temporary directory before the call with unique filenames. Extended attributes are copied to them. So you can do what you want with them.
If the method returns True: - we considerer that the processing is ok - all files are deleted if necessary
If the method returns False: - we considerer that the result is False for each file - so each file follow the failure policy
If the method returns an array of booleans (of the same size than the xafs array), we consider that the processing status for each file.
Args
xafs
:list
- XattrFile objects (list of files to process).
Returns
Processing status (True, False or array of booleans).
Inherited members
class AcquisitionCopyStep
-
Abstract class to describe an acquisition step.
You have to override this class.
Attributes
stop_flag
:boolean
- if True, stop the daemon as soon as possible.
debug_mode_allowed
:boolean
- if True, the debug mode is allowed.
step_limit
:int
- maximum step number (to avoid some loops).
Constructor.
Expand source code
class AcquisitionCopyStep(AcquisitionStep): def _init(self): AcquisitionStep._init(self) self.dest_dirs = [] self.uid = os.getuid() if self.args.dest_dirs != "": for tmp in self.args.dest_dirs.split(','): if '/' not in tmp or tmp.startswith('/'): raise Exception("invalid target: %s in dest-dirs: %s" % (tmp, self.args.dest_dirs)) tmp2 = tmp.split('/') if len(tmp2) != 2: raise Exception("invalid target: %s in dest-dirs: %s" % (tmp, self.args.dest_dirs)) plugin_name = tmp2[0].strip() if tmp2[1].endswith('*'): hardlink = True step_name = tmp2[1][:-1].strip() else: hardlink = False step_name = tmp2[1].strip() self.dest_dirs.append((plugin_name, step_name, hardlink)) self.add_virtual_trace(plugin_name, step_name) def add_extra_arguments(self, parser): parser.add_argument( '--dest-dirs', action='store', default="", help='coma separated target plugin/steps (example: plugin1/step1,' 'plugin2/step2,plugin3/step3*) (if the step name ends with a "*", ' "we use hardlink instead of copy (but the target step MUST NOT " "MODIFY the incoming file in any way, so don't add a '*' if " "you are not sure!)") def _fix_uid(self, xaf): self.info("the file: %s is owned by uid:%i and not by current " "uid: %i => let's copy it to change its ownership", xaf.filepath, xaf.getuid(), self.uid) new_tmp_filepath = self.get_tmp_filepath() try: new_xaf = xaf.copy(new_tmp_filepath) except Exception: self.warning("can't copy %s to %s", xaf.filepath, new_tmp_filepath) return xaf else: self.info("%s copied to %s", xaf.filepath, new_tmp_filepath) if not xaf.delete_or_nothing(): self.warning("can't delete: %s", xaf.filepath) return new_xaf def before_copy(self, xaf): return self.dest_dirs def process(self, xaf): self.info("Processing file: %s" % xaf.filepath) try: dest_dirs = self.before_copy(xaf) if dest_dirs is None or dest_dirs is False: self.debug("before_copy() returned None or False " "=> we do nothing more") return True except Exception: self.exception("exception during before_copy() => failure") return False uid = xaf.getuid() fix_uid = uid is not None and uid != self.uid if len(dest_dirs) == 0: self.info("No dest-dirs => deleting %s" % xaf.filepath) xaf.delete_or_nothing() return True if len(dest_dirs) == 1: # shortpaths if we have only one action plugin_name, step_name, hardlink = dest_dirs[0] if hardlink: if fix_uid: # We force a copy here to be sure that the file will be # owned by current user return self.copy_to_plugin_step( xaf, plugin_name, step_name, info=True, keep_original_basename=True) else: # A single move is ok here return self.move_to_plugin_step( xaf, plugin_name, step_name, info=True, keep_original_basename=True) else: if fix_uid: # No optmization here return self.copy_to_plugin_step( xaf, plugin_name, step_name, info=True, keep_original_basename=True) else: # a single move is ok here return self.move_to_plugin_step( xaf, plugin_name, step_name, info=True, keep_original_basename=True) # len(actions) > 1 can_hardlink = any([x for _, _, x in dest_dirs]) if fix_uid and can_hardlink: xaf = self._fix_uid(xaf) result = True hardlink_used = False for plugin_name, step_name, hardlink in dest_dirs[:-1]: if hardlink: result = result and \ self.hardlink_to_plugin_step(xaf, plugin_name, step_name, info=True, keep_original_basename=True) hardlink_used = True else: result = result and \ self.copy_to_plugin_step( xaf, plugin_name, step_name, info=True, keep_original_basename=True) # Special case for last directory (we can optimize a little bit) # If there is no error: # If the last directory allow hardlinking => move # If the last directory does not allow hardlinking # If we used hardlinking for other directories => copy # If we didn't use hardlinking for other directories => move # If there is some errors: # => copy to keep the original file for trash policy plugin_name, step_name, hardlink = dest_dirs[-1] if result: if hardlink: # we can hardlink here, so we can move last one result = result and \ self.move_to_plugin_step(xaf, plugin_name, step_name, info=True, keep_original_basename=True) else: if hardlink_used: # we have to copy result = result and \ self.copy_to_plugin_step( xaf, plugin_name, step_name, info=True, keep_original_basename=True) else: # no hardlink used, we can move the last one result = result and \ self.move_to_plugin_step(xaf, plugin_name, step_name, info=True, keep_original_basename=True) else: # there are some errors, we prefer to copy to keep the original # file for trash policy result = result and \ self.copy_to_plugin_step( xaf, plugin_name, step_name, info=True, keep_original_basename=True) return result
Ancestors
Subclasses
Methods
def before_copy(self, xaf)
Inherited members
class AcquisitionForkStep
-
Class to describe a fork acquisition step.
Constructor.
Expand source code
class AcquisitionForkStep(AcquisitionTransformStep): """ Class to describe a fork acquisition step. """ # Just to avoid an invalid parameter exception with # command_returning_path=0 # (this value will never be used) dest_dir_default = "FIXME/FIXME" def add_extra_arguments(self, parser): AcquisitionTransformStep.add_extra_arguments(self, parser) parser.add_argument( '--command-template', action='store', default=None, help='command template to execute ({PATH} string' 'will be replaced by the file fullpath, ' '{{MFDATA_CURRENT_PLUGIN_DIR}} by the full plugin dir path') parser.add_argument( '--command-returning-path', action='store', help='if set to 1, we will parse the command output ' '(on stdout only) ' 'and lines starting with FILEPATH: string will be ' 'interpreted as a full filepath which will be injected in ' 'dest-dir with tags (WARNING: the returned filepath will be ' 'deleted') def _init(self): AcquisitionTransformStep._init(self) self.command_returning_path = \ self.args.command_returning_path.strip() == "1" if self.args.command_template is None or \ self.args.command_template == "FIXME": raise Exception('you have to set a command-template') if "{PATH}" not in self.args.command_template: raise Exception('{PATH} is not in the command template') if self.args.dest_dir == "FIXME/FIXME" and \ self.command_returning_path: raise Exception("command_returning_path == 1 => you have to set " "a valid dest_dir") def get_command(self, filepath): return self.args.command_template\ .replace('{PATH}', filepath)\ .replace('{{MFDATA_CURRENT_PLUGIN_DIR}}', self.get_plugin_directory_path()) def transform(self, xaf): cmd = self.get_command(xaf.filepath) self.info("Calling %s ...", cmd) x = BashWrapper(cmd) if not x: self.warning("%s returned a bad return code: %i, details: %s", cmd, x.code, x) return else: self.debug("%s returned a good return code, output: %s", cmd, x) if self.command_returning_path: paths = [] lines = [tmp.strip() for tmp in x.stdout.split("\n")] for line in lines: if line.startswith("FILEPATH:"): path = line[len("FILEPATH:"):].strip() if not path.startswith('/'): self.warning("returned path: %s does not start with / " "=> ignoring" % path) return if not os.path.exists(path): self.warning("returned path: %s does not exist " "=> ignoring" % path) return self.debug("returned path = %s" % path) paths.append(path) return paths
Ancestors
Class variables
var dest_dir_default
Methods
def get_command(self, filepath)
def transform(self, xaf)
Inherited members
class AcquisitionListener
-
Abstract class to describe an acquisition listener.
You have to override this class.
Constructor.
Expand source code
class AcquisitionListener(AcquisitionBase): """Abstract class to describe an acquisition listener. You have to override this class. """ def listen(self): """Listen to something. You have to override this method. When this method returns, the execution is stopped. """ raise NotImplementedError("listen() method must be overriden") def run(self): self._init() self.listen() self._destroy()
Ancestors
Methods
def listen(self)
-
Listen to something.
You have to override this method.
When this method returns, the execution is stopped.
def run(self)
Inherited members
class AcquisitionMoveStep
-
Abstract class to describe an acquisition step.
You have to override this class.
Attributes
stop_flag
:boolean
- if True, stop the daemon as soon as possible.
debug_mode_allowed
:boolean
- if True, the debug mode is allowed.
step_limit
:int
- maximum step number (to avoid some loops).
Constructor.
Expand source code
class AcquisitionMoveStep(AcquisitionStep): def _init(self): AcquisitionStep._init(self) if self.args.dest_dir == "FIXME": raise Exception("dest_dir is not set") if '/' not in self.args.dest_dir: raise Exception("invalid dest_dir: %s" % self.args.dest_dir) if self.args.dest_dir.startswith('/'): self.dest_dir = self.args.dest_dir else: plugin_name = self.args.dest_dir.split('/')[0] step_name = self.args.dest_dir.split('/')[1] self.dest_dir = get_plugin_step_directory_path(plugin_name, step_name) mkdir_p_or_die(self.dest_dir) self.add_virtual_trace(self.dest_dir) try: if self.args.drop_tags == "AUTO": self.drop_tags = self.args.dest_dir.startswith('/') elif self.args.drop_tags == "1": self.drop_tags = True elif self.args.drop_tags == "0": self.drop_tags = False else: raise Exception("invalid value for drop_tags configuration " "key: %s" % self.args.drop_tags) except AttributeError: self.drop_tags = True try: if self.args.force_chmod == "null": self.force_chmod = "" else: self.force_chmod = self.args.force_chmod except AttributeError: self.force_chmod = "" if self.args.dest_basename == "": raise Exception("dest_basename can't be empty") self.dest_basename = self.args.dest_basename def add_dest_arguments(self, parser): parser.add_argument('--dest-dir', action='store', default="FIXME", help='destination directory (can be an absolute ' 'path or something like "plugin_name/step_name")') parser.add_argument('--dest-basename', action='store', default="{ORIGINAL_UID}", help='target basename (under dest_dir)') def add_force_chmod_argument(self, parser): parser.add_argument('--force-chmod', action='store', default="", help='if non empty, force chmod on files after ' 'move with well known octal value ' '(example : 0700)') def add_extra_arguments(self, parser): self.add_dest_arguments(parser) parser.add_argument('--drop-tags', action='store', default="AUTO", help='1 (yes), 0 (no) or ' 'AUTO (yes for absolute dest-dir)') self.add_force_chmod_argument(parser) def compute_basename(self, xaf): step_counter = self._get_counter_tag_value(xaf, not_found_value='999') replaces = { "{RANDOM_ID}": get_unique_hexa_identifier(), "{ORIGINAL_BASENAME}": self.get_original_basename(xaf), "{ORIGINAL_DIRNAME}": self.get_original_dirname(xaf), "{ORIGINAL_UID}": self.get_original_uid(xaf), "{STEP_COUNTER}": str(step_counter) } tmp = self.dest_basename for to_replace, replaced in replaces.items(): tmp = tmp.replace(to_replace, replaced) if '%' in tmp: return datetime.datetime.now().strftime(tmp) else: return tmp def before_move(self, xaf): new_filepath = os.path.join(self.dest_dir, self.compute_basename(xaf)) return new_filepath def process(self, xaf): self.info("Processing file: %s" % xaf.filepath) try: new_filepath = self.before_move(xaf) if new_filepath is None or new_filepath is False: self.debug("before_move() returned None or False " "=> we do nothing more") return True except Exception: self.exception("exception during before_move() => failure") return False fcmi = int(self.force_chmod, 8) \ if self.force_chmod != "" else None # Store old xaf filepath to display in the logs old_filepath = xaf.filepath if self.drop_tags: xaf.clear_tags() success, moved = xaf.move_or_copy(new_filepath, chmod_mode_int=fcmi) if success: if moved: self.info("%s moved into %s", old_filepath, new_filepath) else: self.info("%s copied into %s", xaf.filepath, new_filepath) return True else: self.warning("Can't move/copy %s to %s", xaf.filepath, new_filepath) return False
Ancestors
Subclasses
Methods
def add_dest_arguments(self, parser)
def add_force_chmod_argument(self, parser)
def before_move(self, xaf)
def compute_basename(self, xaf)
Inherited members
class AcquisitionReinjectStep
-
Class to describe a re-inject acquisition step.
Constructor.
Expand source code
class AcquisitionReinjectStep(AcquisitionStep): """ Class to describe a re-inject acquisition step. """ debug_mode_allowed = False def add_extra_arguments(self, parser): parser.add_argument('--dest-dir', action='store', default="FIXME", help='destination directory (can be an absolute ' 'path or something like "plugin_name/step_name")') parser.add_argument('--retry-min-wait', action='store', type=float, default=10) parser.add_argument('--retry-max-wait', action='store', type=float, default=120) parser.add_argument('--retry-total', action='store', type=int, default=10) parser.add_argument('--retry-backoff', action='store', type=float, default=0) def _init(self): AcquisitionStep._init(self) self.__xafs = {} if self.args.dest_dir is None: raise Exception('you have to set a dest-dir argument') if self.args.dest_dir == "FIXME": raise Exception('you have to set a dest-dir argument') if '/' not in self.args.dest_dir: raise Exception("invalid dest_dir: %s" % self.args.dest_dir) if self.args.dest_dir.startswith('/'): raise Exception("invalid dest_dir: %s, must be something like: " "plugin_name/step_name" % self.args.dest_dir) plugin_name = self.args.dest_dir.split('/')[0] step_name = self.args.dest_dir.split('/')[1] self.add_virtual_trace(plugin_name, step_name) self.add_virtual_trace(">delete", "giveup") self.dest_dir = get_plugin_step_directory_path(plugin_name, step_name) mkdir_p_or_die(self.dest_dir) self.retry_total = self.args.retry_total self.retry_min_wait = self.args.retry_min_wait self.retry_max_wait = self.args.retry_max_wait self.retry_backoff = self.args.retry_backoff def destroy(self): self.debug("destroy called") # we generate a IN_MOVE event on remaining files to generate new # events on redis (to be sure that remaining files will be checked # at startup) filepaths = list(self.__xafs.keys()) for filepath in filepaths: try: xaf = self.__xafs[filepath][2] except KeyError: pass new_filepath = filepath + ".t" self.debug("move %s to %s" % (filepath, new_filepath)) xaf.rename(new_filepath) xaf2 = XattrFile(new_filepath) self.debug("move %s to %s" % (new_filepath, filepath)) xaf2.rename(filepath) def _before(self, xaf, **kwargs): if self.retry_total <= 0: self.info("retry_total <= 0 => let's delete the file") xaf.delete_or_nothing() return False if xaf.filepath not in self.__xafs: self._set_before_tags(xaf) retry_attempt = int(self.get_tag(xaf, "attempt", "0")) self.__xafs[xaf.filepath] = (time.time(), retry_attempt, xaf) return False def delay(self, attempt_number): tmp = self.retry_min_wait + \ self.retry_backoff * (2 ** (attempt_number - 1)) if tmp > self.retry_max_wait: return self.retry_max_wait return tmp def reinject(self, xaf, retry_attempt): self._set_tag_latest(xaf, "attempt", str(retry_attempt + 1)) filepath = xaf.filepath self.info("reinjecting %s into %s/... attempt %d", filepath, self.dest_dir, retry_attempt + 1) new_filepath = os.path.join(self.dest_dir, get_unique_hexa_identifier()) xaf.move_or_copy(new_filepath) self.add_trace(xaf, self.dest_dir) self.get_stats_client().incr("number_of_processed_files", 1) self.get_stats_client().incr("bytes_of_processed_files", xaf.getsize()) def give_up(self, xaf): self.warning("max retry attempt for %s => invoking trash policy", xaf.filepath) self.get_stats_client().incr("number_of_processing_errors", 1) self._trash(xaf) def ping(self): now = time.time() filepaths = list(self.__xafs.keys()) for filepath in filepaths: try: if not os.path.exists(filepath): del self.__xafs[filepath] mtime, retry_attempt, xaf = self.__xafs[filepath] if retry_attempt >= self.retry_total: del self.__xafs[filepath] self.give_up(xaf) delay = self.delay(retry_attempt + 1) if now - mtime >= delay: self.reinject(xaf, retry_attempt) del self.__xafs[filepath] except KeyError: pass
Ancestors
Class variables
var debug_mode_allowed
Methods
def delay(self, attempt_number)
def give_up(self, xaf)
def reinject(self, xaf, retry_attempt)
Inherited members
class AcquisitionStatsDClient (plugin_name, step_name, extra_tags={})
-
Expand source code
class AcquisitionStatsDClient(object): plugin_name = None step_name = None __suffix_cache = None def __init__(self, plugin_name, step_name, extra_tags={}): self.plugin_name = plugin_name self.step_name = step_name self.extra_tags = extra_tags def gauge(self, stat, value): raise NotImplementedError() def set(self, stat, value): raise NotImplementedError() def timing(self, stat, delta): raise NotImplementedError() def incr(self, stat, count): raise NotImplementedError() def decr(self, stat, count): raise NotImplementedError() def timer(self, stat): raise NotImplementedError() def pipeline(self): raise NotImplementedError() def _get_prefix(self): return "" def _get_suffix(self): if self.__suffix_cache is None: tmp = ["", "module=%s" % MFMODULE.lower(), "host=%s" % HOSTNAME] if self.plugin_name is not None: tmp.append("plugin=%s" % self.plugin_name) if self.step_name is not None: tmp.append("step=%s" % self.step_name) for k, v in self.extra_tags.items(): tmp.append("%s=%s" % (k, v)) self.__suffix_cache = ",".join(tmp) return self.__suffix_cache def _stat(self, stat): return "%s%s%s" % (self._get_prefix(), stat, self._get_suffix())
Subclasses
Class variables
var plugin_name
var step_name
Methods
def decr(self, stat, count)
def gauge(self, stat, value)
def incr(self, stat, count)
def pipeline(self)
def set(self, stat, value)
def timer(self, stat)
def timing(self, stat, delta)
class AcquisitionStep
-
Abstract class to describe an acquisition step.
You have to override this class.
Attributes
stop_flag
:boolean
- if True, stop the daemon as soon as possible.
debug_mode_allowed
:boolean
- if True, the debug mode is allowed.
step_limit
:int
- maximum step number (to avoid some loops).
Constructor.
Expand source code
class AcquisitionStep(AcquisitionBase): """Abstract class to describe an acquisition step. You have to override this class. Attributes: stop_flag (boolean): if True, stop the daemon as soon as possible. debug_mode_allowed (boolean): if True, the debug mode is allowed. step_limit (int): maximum step number (to avoid some loops). """ stop_flag = False debug_mode_allowed = True unit_tests = False unit_tests_args = None failure_policy = None step_limit = DEFAULT_STEP_LIMIT __last_ping = None _debug_mode = False def _init(self): super(AcquisitionStep, self)._init() self.failure_policy = self.args.failure_policy self.failure_policy_move_dest_dir = None self.failure_policy_move_absolute_dir = False if self.failure_policy not in ("keep", "delete", "move"): self.error_and_die( "unknown failure policy: %s", self.failure_policy ) if self.failure_policy == "move": fpmdd = self.args.failure_policy_move_dest_dir if fpmdd is None or fpmdd == "" or fpmdd == "FIXME": self.error_and_die( "you have to set a " "failure-policy-move-dest-dir" " in case of move failure policy" ) if "/" not in fpmdd: self.error_and_die( "failure-policy-move-dest-dir must be something like " "plugin_name/step_name or an absolute path") if fpmdd.startswith('/'): # absolute dir mkdir_p_or_die(fpmdd) self.add_virtual_trace(fpmdd) self.failure_policy_move_dest_dir = fpmdd self.failure_policy_move_absolute_dir = True else: # plugin_name/step_name plugin_name = fpmdd.split('/')[0] step_name = fpmdd.split('/')[1] self.add_virtual_trace(plugin_name, step_name) self.failure_policy_move_dest_dir = \ get_plugin_step_directory_path(plugin_name, step_name) mkdir_p_or_die(self.failure_policy_move_dest_dir) signal.signal(signal.SIGTERM, self.__sigterm_handler) def _add_extra_arguments_before(self, parser): parser.add_argument( "--redis-host", action="store", default="127.0.0.1", help="redis host to connect to (in daemon mode)", ) parser.add_argument( "--redis-port", action="store", default=6379, help="redis port to connect to (in daemon mode)", ) parser.add_argument( "--redis-unix-socket-path", action="store", default=None, help="path to redis unix socket (overrides " "redis-host and redis-port if set)", ) parser.add_argument( "--failure-policy", action="store", default="keep", help="failure policy (keep, delete or move)", ) parser.add_argument( "--failure-policy-move-dest-dir", action="store", default=None, help="dest-dir in case of move failure policy (must be something " "like plugin_name/step_name or an absolute dir path)", ) def _add_extra_arguments_after(self, parser): parser.add_argument( "FULL_FILEPATH_OR_QUEUE_NAME", action="store", help="if starts with /, we consider we are " "in debug mode, if not, we consider we are in " "daemon mode", ) def _exception_safe_call( self, func, args, kwargs, label, return_value_if_exception ): try: return func(*args, **kwargs) except Exception: self.exception("exception during %s" % label) return return_value_if_exception def _process(self, xaf, force_original_filepath=None): if force_original_filepath is not None: xaf._original_filepath = force_original_filepath else: xaf._original_filepath = xaf.filepath self.info("Start the processing of %s...", xaf._original_filepath) timer = self.get_stats_client().timer("processing_file_timer") timer.start() before_status = self._before(xaf) if before_status is False: timer.stop(send=False) self.info( "End of the %s processing after %i ms " "(before_status=False)", xaf._original_filepath, timer.ms, ) return False size = xaf.getsize() process_status = self._exception_safe_call( self.process, [xaf], {}, "process of %s" % xaf._original_filepath, False, ) after_status = self._after(xaf, process_status) self.get_stats_client().incr("number_of_processed_files", 1) self.get_stats_client().incr("bytes_of_processed_files", size) if not after_status: self.get_stats_client().incr("number_of_processing_errors", 1) timer.stop() self.info( "End of the %s processing after %i ms", xaf._original_filepath, timer.ms, ) if not after_status: self.warning( "Bad processing status for file: %s", xaf._original_filepath ) logger = self._get_logger() if logger.isEnabledFor(10): # DEBUG xaf.dump_tags_on_logger(logger, 10) # DEBUG return after_status def _before(self, xaf): if "first.core.original_uid" in xaf.tags: forced_uid = \ xaf.tags['first.core.original_uid'].decode('utf8') tmp_filepath = self.get_tmp_filepath( forced_basename=forced_uid) else: tmp_filepath = self.get_tmp_filepath() forced_uid = os.path.basename(tmp_filepath) self.info("Move %s to %s (to process it)", xaf.filepath, tmp_filepath) try: xaf.rename(tmp_filepath) except (IOError, OSError): self.debug( "Can't move %s to %s: file does not " "exist any more ?", xaf.filepath, tmp_filepath, ) return False self._set_original_uid_if_necessary(xaf, forced_uid=forced_uid) xaf._before_process_filepath = xaf.filepath self._set_before_tags(xaf) if self._get_counter_tag_value(xaf) > self.step_limit: self.warning( "step_value bigger than step_limit [%i] => " "deleting file to avoid a recursion loop ?" % self.step_limit ) return False return True def _trash(self, xaf): if self.failure_policy == "delete": xaf.delete_or_nothing() self.add_trace(xaf, ">trash", "delete") elif self.failure_policy == "keep": self.add_trace(xaf, ">trash", "keep") new_filepath = os.path.join( _get_or_make_trash_dir(self.plugin_name, self.step_name), xaf.basename(), ) (success, move) = xaf.move_or_copy(new_filepath) if success: xaf.write_tags_in_a_file(new_filepath + ".tags") xattrfile.XattrFile(new_filepath).clear_tags() else: xaf.delete_or_nothing() elif self.failure_policy == "move": new_filepath = os.path.join( self.failure_policy_move_dest_dir, xaf.basename() ) (success, move) = xaf.move_or_copy(new_filepath) if not success: xaf.delete_or_nothing() else: if self.failure_policy_move_absolute_dir: xaf.write_tags_in_a_file(new_filepath + ".tags") xattrfile.XattrFile(new_filepath).clear_tags() self.add_trace(xaf, self.failure_policy_move_dest_dir) def _after(self, xaf, process_status): if process_status: if hasattr(xaf, "_before_process_filepath"): if xaf._before_process_filepath: if xaf._before_process_filepath == xaf.filepath: xaf.delete_or_nothing() else: self._trash(xaf) self._exception_safe_call( self.after, [process_status], {}, "after %s" % xaf.filepath, False ) return process_status def _ping(self): self.__last_ping = datetime.now(timezone.utc).replace(tzinfo=None) self._exception_safe_call(self.ping, [], {}, "ping", False) def __ping_if_needed(self): if self.stop_flag: return if self.__last_ping is None: return self._ping() delta = (datetime.now(timezone.utc).replace(tzinfo=None) - self.__last_ping).total_seconds() if delta > 1: self._ping() def __run_in_daemon_mode( self, redis_host, redis_port, redis_queue, redis_unix_socket_path=None ): self.info("Start the daemon mode with redis_queue=%s", redis_queue) if redis_unix_socket_path: r = redis.StrictRedis(unix_socket_path=redis_unix_socket_path) else: r = redis.StrictRedis(host=redis_host, port=redis_port) redis_connection_exceptions_counter = 0 while not self.stop_flag: self.__ping_if_needed() try: msg = r.brpop(redis_queue, 1) except KeyboardInterrupt: self.stop_flag = True continue except redis.exceptions.ConnectionError: redis_connection_exceptions_counter = ( redis_connection_exceptions_counter + 1 ) if redis_connection_exceptions_counter >= 10: self.critical("Can't connect to redis after 10s => exit") self.stop_flag = True continue else: self.debug( "Can't connect to redis => " "I will try again after 1s sleep" ) time.sleep(1) continue redis_connection_exceptions_counter = 0 if msg is None: # brpop timeout continue try: decoded_msg = json.loads(msg[1].decode("utf8")) filepath = os.path.join( decoded_msg["directory"], decoded_msg["filename"] ) except Exception: self.warning( "wrong message type popped on bus " "=> stopping to force a restart" ) self.stop_flag = True continue if not os.path.exists(filepath): self.debug( "filepath: %s does not exist anymore " "=> ignoring event", filepath, ) continue xaf = xattrfile.XattrFile(filepath) counter = "counter.%s.%s" % (self.plugin_name, self.step_name) latest = "latest.%s.%s" % (self.plugin_name, self.step_name) try: r.incr(counter) except Exception: self.warning("can't update redis counter: %s" % counter) try: r.set(latest, str(get_utc_unix_timestamp())) except Exception: self.warning("can't set latest timestamp: %s" % latest) self._process(xaf) self.debug("Stop to brpop queue %s" % redis_queue) def __run_in_debug_mode(self, filepath): self.info("Start the debug mode with filepath=%s", filepath) self._get_logger().setLevel(0) self._debug_mode = True self._ping() tmp_filepath = self.get_tmp_filepath() original_xaf = xattrfile.XattrFile(filepath) xaf = original_xaf.copy(tmp_filepath) return self._process(xaf, force_original_filepath=filepath) def get_stats_client(self, extra_tags={}): return get_stats_client(self.plugin_name, self.step_name, extra_tags) def get_plugin_directory_path(self): """Return the plugin directory (fullpath). Returns: (string) the fullpath of the plugin directory. """ return os.path.join( MFMODULE_RUNTIME_HOME, "var", "plugins", self.plugin_name ) def __xxx_to_plugin_step_get_basename(self, xaf, keep_original_basename): if keep_original_basename is None: if "first.core.original_uid" in xaf.tags: return xaf.tags["first.core.original_uid"].decode('utf8') else: keep_original_basename = False if keep_original_basename is True: return xaf.basename() elif keep_original_basename is False: return get_unique_hexa_identifier() else: raise Exception("invalid value for keep_original_basename: %s " "(must be True, False or None)", keep_original_basename) def hardlink_to_plugin_step(self, xaf, plugin_name, step_name, keep_original_basename=False, info=False): """Hardlink (or copy) a XattrFile to another plugin/step. Args: xaf (XattrFile): XattrFile to move. plugin_name (string): plugin name. step_name (string): step name. keep_original_basename (boolean): if True, we keep the original basename of xaf. If False, we generate a new basename, If None, we use the value of first.core.original_uid tag as basename (if exists). info (boolean): if true, add INFO log messages. Returns: boolean: True if ok. """ basename = \ self.__xxx_to_plugin_step_get_basename(xaf, keep_original_basename) old_filepath = xaf.filepath target_path = os.path.join( get_plugin_step_directory_path(plugin_name, step_name), basename, ) result, hardlinked = xaf.hardlink_or_copy(target_path) if info and result: if hardlinked: self.info("File %s hardlinked to %s" % (old_filepath, target_path)) else: self.info("File %s copied to %s (can't hardlink)" % (old_filepath, target_path)) if result: self.add_trace(xaf, plugin_name, step_name) return result def move_to_plugin_step(self, xaf, plugin_name, step_name, keep_original_basename=False, info=False): """Move (or copy) a XattrFile to another plugin/step. Args: xaf (XattrFile): XattrFile to move. plugin_name (string): plugin name. step_name (string): step name. keep_original_basename (boolean): if True, we keep the original basename of xaf. If False, we generate a new basename, If None, we use the value of first.core.original_uid tag as basename (if exists). info (boolean): if true, add INFO log messages. Returns: boolean: True if ok. """ basename = \ self.__xxx_to_plugin_step_get_basename(xaf, keep_original_basename) old_filepath = xaf.filepath target_path = os.path.join( get_plugin_step_directory_path(plugin_name, step_name), basename, ) result, moved = xaf.move_or_copy(target_path) if info and result: if moved: self.info("File %s moved to %s" % (old_filepath, target_path)) else: self.info("File %s copied to %s (can't move)" % (old_filepath, target_path)) if result: self.add_trace(xaf, plugin_name, step_name) return result def copy_to_plugin_step(self, xaf, plugin_name, step_name, keep_original_basename=False, info=False): """Copy a XattrFile (with tags) to another plugin/step. Args: xaf (XattrFile): XattrFile to move. plugin_name (string): plugin name. step_name (string): step name. keep_original_basename (boolean): if True, we keep the original basename of xaf. If False, we generate a new basename, If None, we use the value of first.core.original_uid tag as basename (if exists). info (boolean): if true, add INFO log messages. Returns: boolean: True if ok """ basename = \ self.__xxx_to_plugin_step_get_basename(xaf, keep_original_basename) old_filepath = xaf.filepath target_path = os.path.join( get_plugin_step_directory_path(plugin_name, step_name), basename, ) result = xaf.copy_or_nothing(target_path) if info and result: self.info("File %s copied to %s" % (old_filepath, target_path)) if result: self.add_trace(xaf, plugin_name, step_name) return result def process(self, xaf): """Process one file. Process one XattrFile. You have to override this method (unless your class inherits from BatchStep, see batch_process() in that case). The file is moved into a temporary directory before the call with an unique filename. Extended attributes are copied to it. So you can do what you want with it. If the method returns True: - we considerer that the processing is ok - the file is delete if necessary If the method doesn't return True: - we considerer that the processing is not ok (a warning message is logged). - the file is given to the choosen failure policy. Args: xaf : XattrFile object. Returns: boolean: processing status (True: ok, False: not ok) """ raise NotImplementedError("process() method must be overriden") def after(self, status): """Method called after the process execution. It's called event in case of exceptions during process. """ return def ping(self): """Do something every second if possible. The call can be blocked by a long running process() call. """ return def add_extra_arguments(self, parser): """Add some extra argument to commande line parsing. If you have to add some, you have to override this method. Args: parser: an ArgumentParser object (with default options added). """ pass def run(self): self._init() if self.args.FULL_FILEPATH_OR_QUEUE_NAME.startswith("/"): if not self.debug_mode_allowed: self.error_and_die("debug mode is not allowed for this step") self.__run_in_debug_mode(self.args.FULL_FILEPATH_OR_QUEUE_NAME) else: self.__run_in_daemon_mode( self.args.redis_host, self.args.redis_port, self.args.FULL_FILEPATH_OR_QUEUE_NAME, self.args.redis_unix_socket_path, ) self._destroy() def get_original_basename(self, xaf): """Return the original basename of the file. The original basename is the not modified basename before step0 execution. "unknown" is returned if we can't find the basename. Args: xaf (XattrFile): file to consider. Returns: string: original basename. """ tag_name = self._get_original_basename_tag_name() return xaf.tags.get(tag_name, b"unknown").decode("utf8") def get_original_uid(self, xaf): """Return the original uid of the file. The original uid is the first unique id given before step0 execution. "unknown" is returned if we can't find the original uid. Args: xaf (XattrFile): file to consider. Returns: string: original uid. """ tag_name = self._get_original_uid_tag_name() return xaf.tags.get(tag_name, b"unknown").decode("utf8") def get_original_dirname(self, xaf): """Return the original dirname of the file. The original dirname is the not modified basename before step0 execution. "unknown" is returned if we can't find the dirname. Args: xaf (XattrFile): file to consider. Returns: string: original dirname. """ tag_name = self._get_original_dirname_tag_name() return xaf.tags.get(tag_name, b"unknown").decode("utf8") def __sigterm_handler(self, *args): self.debug("SIGTERM signal handled => schedulling shutdown") self.stop_flag = True
Ancestors
Subclasses
- AcquisitionBatchStep
- AcquisitionCopyStep
- AcquisitionDeleteStep
- AcquisitionMoveStep
- AcquisitionReinjectStep
- AcquisitionTransformStep
Class variables
var debug_mode_allowed
var failure_policy
var step_limit
var stop_flag
var unit_tests
var unit_tests_args
Methods
def add_extra_arguments(self, parser)
-
Add some extra argument to commande line parsing.
If you have to add some, you have to override this method.
Args
parser
- an ArgumentParser object (with default options added).
def after(self, status)
-
Method called after the process execution.
It's called event in case of exceptions during process.
def copy_to_plugin_step(self, xaf, plugin_name, step_name, keep_original_basename=False, info=False)
-
Copy a XattrFile (with tags) to another plugin/step.
Args
xaf
:XattrFile
- XattrFile to move.
plugin_name
:string
- plugin name.
step_name
:string
- step name.
keep_original_basename
:boolean
- if True, we keep the original basename of xaf. If False, we generate a new basename, If None, we use the value of first.core.original_uid tag as basename (if exists).
info
:boolean
- if true, add INFO log messages.
Returns
boolean
- True if ok
def get_original_basename(self, xaf)
-
Return the original basename of the file.
The original basename is the not modified basename before step0 execution. "unknown" is returned if we can't find the basename.
Args
xaf
:XattrFile
- file to consider.
Returns
string
- original basename.
def get_original_dirname(self, xaf)
-
Return the original dirname of the file.
The original dirname is the not modified basename before step0 execution. "unknown" is returned if we can't find the dirname.
Args
xaf
:XattrFile
- file to consider.
Returns
string
- original dirname.
def get_original_uid(self, xaf)
-
Return the original uid of the file.
The original uid is the first unique id given before step0 execution. "unknown" is returned if we can't find the original uid.
Args
xaf
:XattrFile
- file to consider.
Returns
string
- original uid.
def get_stats_client(self, extra_tags={})
def hardlink_to_plugin_step(self, xaf, plugin_name, step_name, keep_original_basename=False, info=False)
-
Hardlink (or copy) a XattrFile to another plugin/step.
Args
xaf
:XattrFile
- XattrFile to move.
plugin_name
:string
- plugin name.
step_name
:string
- step name.
keep_original_basename
:boolean
- if True, we keep the original basename of xaf. If False, we generate a new basename, If None, we use the value of first.core.original_uid tag as basename (if exists).
info
:boolean
- if true, add INFO log messages.
Returns
boolean
- True if ok.
def move_to_plugin_step(self, xaf, plugin_name, step_name, keep_original_basename=False, info=False)
-
Move (or copy) a XattrFile to another plugin/step.
Args
xaf
:XattrFile
- XattrFile to move.
plugin_name
:string
- plugin name.
step_name
:string
- step name.
keep_original_basename
:boolean
- if True, we keep the original basename of xaf. If False, we generate a new basename, If None, we use the value of first.core.original_uid tag as basename (if exists).
info
:boolean
- if true, add INFO log messages.
Returns
boolean
- True if ok.
def ping(self)
-
Do something every second if possible.
The call can be blocked by a long running process() call.
def process(self, xaf)
-
Process one file.
Process one XattrFile. You have to override this method (unless your class inherits from BatchStep, see batch_process() in that case).
The file is moved into a temporary directory before the call with an unique filename. Extended attributes are copied to it. So you can do what you want with it.
If the method returns True:
- we considerer that the processing is ok
- the file is delete if necessary
If the method doesn't return True:
- we considerer that the processing is not ok (a warning message is logged).
- the file is given to the choosen failure policy.
Args
xaf : XattrFile object.
Returns
boolean
- processing status (True: ok, False: not ok)
def run(self)
Inherited members
class AcquisitionTransformStep
-
Abstract class to describe an acquisition step.
You have to override this class.
Attributes
stop_flag
:boolean
- if True, stop the daemon as soon as possible.
debug_mode_allowed
:boolean
- if True, the debug mode is allowed.
step_limit
:int
- maximum step number (to avoid some loops).
Constructor.
Expand source code
class AcquisitionTransformStep(AcquisitionStep): dest_dir_default = "FIXME" def _init(self): AcquisitionStep._init(self) if self.args.dest_dir.startswith('/'): raise Exception("invalid dest_dir: %s, must be something like " "plugin_name/step_name" % self.args.dest_dir) if '/' not in self.args.dest_dir: raise Exception("invalid dest_dir: %s" % self.args.dest_dir) tmp = self.args.dest_dir.split('/') if len(tmp) != 2: raise Exception("invalid dest_dir: %s" % self.args.dest_dir) self.dest_plugin_name = tmp[0] self.dest_step_name = tmp[1] self.add_virtual_trace(self.dest_plugin_name, self.dest_step_name) self.keep_transformed_basename = True def add_extra_arguments(self, parser): parser.add_argument('--dest-dir', action='store', default=self.dest_dir_default, help='destination directory (something like ' '"plugin_name/step_name")') def transform(self, xaf): return xaf def process_out(self, xaf, out): if out is None: return True if isinstance(out, XattrFile) or isinstance(out, str): if isinstance(out, XattrFile): if "first.core.original_basename" in out.tags: # we consider that original tags are already copied new_xaf = out else: new_xaf = xaf.copy_tags_on(out.filepath) else: # out is str new_xaf = xaf.copy_tags_on(out) return self.move_to_plugin_step( new_xaf, self.dest_plugin_name, self.dest_step_name, keep_original_basename=self.keep_transformed_basename, info=True) elif isinstance(out, Iterable): # let's iterate return all([self.process_out(xaf, x) for x in out]) else: raise Exception("wrong output type: %s for transform() call" % type(out).__name__) def process(self, xaf): self.info("Processing file: %s" % xaf.filepath) try: out = self.transform(xaf) except Exception: self.exception("exception during transform() call") return False if out is None: self.debug("transform() returned None => we do nothing more") return True return self.process_out(xaf, out)
Ancestors
Subclasses
Class variables
var dest_dir_default
Methods
def process_out(self, xaf, out)
def transform(self, xaf)
Inherited members