Package acquisition

Sub-modules

acquisition.archive_step
acquisition.base
acquisition.batch_step
acquisition.copy_step
acquisition.decorators
acquisition.delete_step
acquisition.fork_step
acquisition.inject_file
acquisition.listener
acquisition.move_step
acquisition.reinject_step
acquisition.stats
acquisition.step
acquisition.switch_rules
acquisition.switch_step
acquisition.transform_step
acquisition.unbzip2_step
acquisition.uncompress_step
acquisition.ungzip_step
acquisition.utils

Classes

class AcquisitionArchiveStep

Class to describe an archive acquisition step.

Constructor.

Expand source code
class AcquisitionArchiveStep(AcquisitionMoveStep):
    """
    Class to describe an archive acquisition step.

    """

    def add_extra_arguments(self, parser):
        self.add_dest_arguments(parser)
        self.add_force_chmod_argument(parser)
        parser.add_argument(
            '--keep-tags-suffix', action='store', default=".tags",
            help='suffix to add to the basename to keep tags'
            ' (put a null or empty value to have no tags at all')

    def _init(self):
        AcquisitionMoveStep._init(self)
        if not self.args.dest_dir.startswith('/'):
            raise Exception("dest-dir must be an absolute directory")
        self.drop_tags = True
        self.keep_tags_suffix = self.args.keep_tags_suffix.strip()

    def before_move(self, xaf):
        basename = self.compute_basename(xaf)
        filepath = os.path.join(self.dest_dir, basename)
        directory = os.path.dirname(filepath)
        if not mkdir_p(directory):
            return False
        if self.keep_tags_suffix not in ("", "null"):
            tags_filepath = filepath + self.keep_tags_suffix
            xaf.write_tags_in_a_file(tags_filepath)
        return filepath

Ancestors

Methods

def before_move(self, xaf)

Inherited members

class AcquisitionBatchStep

Abstract class to describe a batch acquisition step.

You have to override this class.

Constructor.

Expand source code
class AcquisitionBatchStep(AcquisitionStep):
    """Abstract class to describe a batch acquisition step.

    You have to override this class.

    """

    __batch = None

    def __init__(self):
        super(AcquisitionBatchStep, self).__init__()

    @property
    def batch_process_max_size(self):
        """Return the max size of a batch in batch process mode.

        If not overriden, the default value is 100.

        """
        return 100

    @property
    def batch_process_max_wait(self):
        """Max wait (in seconds) to fill the batch in batch process mode.

        If not overriden, the default value is 10 (so 10 seconds).

        """
        return 10

    @property
    def _batch(self):
        # Lazy init of AcquisitionBatch (to be able to have dynamic
        # batch_process_max_size/batch_process_max_wait)
        if self.__batch is None:
            self.__batch = AcquisitionBatch(self.batch_process_max_size,
                                            self.batch_process_max_wait)
        return self.__batch

    def _reinit_batch(self):
        """Reinit the current batch."""
        self.__batch = None

    def process(self, xaf):
        raise NotImplementedError("process() method should not be called in "
                                  "batch process mode")

    def batch_process(self, xafs):
        """Process a batch of files.

        Process several XattrFile (between 1 and max_batch_size files in one
        call). You have to override this method.

        File are moved into a temporary directory before the call with
        unique filenames. Extended attributes are copied to them.
        So you can do what you want with them.

        If the method returns True:
        - we considerer that the processing is ok
        - all files are deleted if necessary

        If the method returns False:
        - we considerer that the result is False for each file
        - so each file follow the failure policy

        If the method returns an array of booleans (of the same size than
        the xafs array), we consider that the processing status for each file.

        Args:
            xafs (list): XattrFile objects (list of files to process).

        Returns:
            Processing status (True, False or array of booleans).

        """
        raise NotImplementedError("batch_process() method must be overriden "
                                  "in \"batch process mode\"")

    def _destroy(self):
        try:
            if self._batch.get_size():
                # we have a last batch to process
                # let's process it by force
                self._conditional_process_batch(force=True)
        except TypeError:
            pass
        super(AcquisitionBatchStep, self)._destroy()

    def _ping(self, *args, **kwargs):
        self._conditional_process_batch()
        super(AcquisitionBatchStep, self)._ping(*args, **kwargs)

    def _is_batch_ready(self, force=False):
        if force and self._batch.get_size() > 0:
            return True
        return self._batch.is_ready()

    def _before(self, xaf):
        status = super(AcquisitionBatchStep, self)._before(xaf)
        if not status:
            return False
        self._batch.append(xaf)
        self.set_tag(xaf, "batch_id", self._batch.id, add_latest=False)
        self.info("File %s added in batch %s", xaf._original_filepath,
                  self._batch.id)
        return False

    def _conditional_process_batch(self, force=False):
        if self._is_batch_ready(force or self._debug_mode):
            self.debug("Batch %s is ready (size: %i, age: %s seconds) => "
                       "let's process it",
                       self._batch.id, self._batch.get_size(),
                       self._batch.get_age())
            self._process_batch()
            self._reinit_batch()
        else:
            if self._batch.get_size() > 0:
                self.debug("Batch %s is not ready (size: %i, age: %s seconds)",
                           self._batch.id, self._batch.get_size(),
                           self._batch.get_age())

    def _process(self, xaf):
        res = super(AcquisitionBatchStep, self)._process(xaf)
        self._conditional_process_batch()
        return res

    def _process_batch(self):
        self.info("Start the processing of batch %s...", self._batch.id)
        timer = self.get_stats_client().timer("processing_batch_timer")
        timer.start()
        xafs = self._batch.get_xafs()
        size = sum([x.getsize() for x in xafs])
        process_status = \
            self._exception_safe_call(self.batch_process, [xafs], {},
                                      "batch_process", False)
        after_status = self._after_batch(xafs, process_status)
        self.get_stats_client().incr("number_of_processed_files", len(xafs))
        self.get_stats_client().incr("bytes_of_processed_files", size)
        self.get_stats_client().incr("number_of_processed_batches", 1)
        timer.stop()
        self.info("End of the processing of batch %s...", self._batch.id)
        if not after_status:
            self.get_stats_client().incr("number_of_processed_"
                                         "batches_error", 1)
            self.warning("Bad processing status for batch: %s", self._batch.id)

    def _after_batch(self, xafs, process_status):
        if process_status is None:
            process_status = False
        if isinstance(process_status, bool):
            for xaf in xafs:
                super(AcquisitionBatchStep, self)._after(xaf, process_status)
            return process_status
        else:
            if len(process_status) != len(xafs):
                self.warning("bad process status len(process_status) = %i "
                             "which is different than len(xafs) = %i" %
                             (len(process_status), len(xafs)))
                super(AcquisitionBatchStep, self)._after(xaf, False)
                return False
            for (xaf, pstatus) in zip(xafs, process_status):
                super(AcquisitionBatchStep, self)._after(xaf, pstatus)
            return min(process_status)

Ancestors

Instance variables

prop batch_process_max_size

Return the max size of a batch in batch process mode.

If not overriden, the default value is 100.

Expand source code
@property
def batch_process_max_size(self):
    """Return the max size of a batch in batch process mode.

    If not overriden, the default value is 100.

    """
    return 100
prop batch_process_max_wait

Max wait (in seconds) to fill the batch in batch process mode.

If not overriden, the default value is 10 (so 10 seconds).

Expand source code
@property
def batch_process_max_wait(self):
    """Max wait (in seconds) to fill the batch in batch process mode.

    If not overriden, the default value is 10 (so 10 seconds).

    """
    return 10

Methods

def batch_process(self, xafs)

Process a batch of files.

Process several XattrFile (between 1 and max_batch_size files in one call). You have to override this method.

File are moved into a temporary directory before the call with unique filenames. Extended attributes are copied to them. So you can do what you want with them.

If the method returns True: - we considerer that the processing is ok - all files are deleted if necessary

If the method returns False: - we considerer that the result is False for each file - so each file follow the failure policy

If the method returns an array of booleans (of the same size than the xafs array), we consider that the processing status for each file.

Args

xafs : list
XattrFile objects (list of files to process).

Returns

Processing status (True, False or array of booleans).

Inherited members

class AcquisitionCopyStep

Abstract class to describe an acquisition step.

You have to override this class.

Attributes

stop_flag : boolean
if True, stop the daemon as soon as possible.
debug_mode_allowed : boolean
if True, the debug mode is allowed.
step_limit : int
maximum step number (to avoid some loops).

Constructor.

Expand source code
class AcquisitionCopyStep(AcquisitionStep):

    def _init(self):
        AcquisitionStep._init(self)
        self.dest_dirs = []
        self.uid = os.getuid()
        if self.args.dest_dirs != "":
            for tmp in self.args.dest_dirs.split(','):
                if '/' not in tmp or tmp.startswith('/'):
                    raise Exception("invalid target: %s in dest-dirs: %s" %
                                    (tmp, self.args.dest_dirs))
                tmp2 = tmp.split('/')
                if len(tmp2) != 2:
                    raise Exception("invalid target: %s in dest-dirs: %s" %
                                    (tmp, self.args.dest_dirs))
                plugin_name = tmp2[0].strip()
                if tmp2[1].endswith('*'):
                    hardlink = True
                    step_name = tmp2[1][:-1].strip()
                else:
                    hardlink = False
                    step_name = tmp2[1].strip()
                self.dest_dirs.append((plugin_name, step_name, hardlink))
                self.add_virtual_trace(plugin_name, step_name)

    def add_extra_arguments(self, parser):
        parser.add_argument(
            '--dest-dirs', action='store',
            default="",
            help='coma separated target plugin/steps (example: plugin1/step1,'
            'plugin2/step2,plugin3/step3*) (if the step name ends with a "*", '
            "we use hardlink instead of copy (but the target step MUST NOT "
            "MODIFY the incoming file in any way, so don't add a '*' if "
            "you are not sure!)")

    def _fix_uid(self, xaf):
        self.info("the file: %s is owned by uid:%i and not by current "
                  "uid: %i => let's copy it to change its ownership",
                  xaf.filepath, xaf.getuid(), self.uid)
        new_tmp_filepath = self.get_tmp_filepath()
        try:
            new_xaf = xaf.copy(new_tmp_filepath)
        except Exception:
            self.warning("can't copy %s to %s", xaf.filepath,
                         new_tmp_filepath)
            return xaf
        else:
            self.info("%s copied to %s", xaf.filepath, new_tmp_filepath)
            if not xaf.delete_or_nothing():
                self.warning("can't delete: %s", xaf.filepath)
            return new_xaf

    def before_copy(self, xaf):
        return self.dest_dirs

    def process(self, xaf):
        self.info("Processing file: %s" % xaf.filepath)
        try:
            dest_dirs = self.before_copy(xaf)
            if dest_dirs is None or dest_dirs is False:
                self.debug("before_copy() returned None or False "
                           "=> we do nothing more")
                return True
        except Exception:
            self.exception("exception during before_copy() => failure")
            return False
        uid = xaf.getuid()
        fix_uid = uid is not None and uid != self.uid
        if len(dest_dirs) == 0:
            self.info("No dest-dirs => deleting %s" % xaf.filepath)
            xaf.delete_or_nothing()
            return True
        if len(dest_dirs) == 1:
            # shortpaths if we have only one action
            plugin_name, step_name, hardlink = dest_dirs[0]
            if hardlink:
                if fix_uid:
                    # We force a copy here to be sure that the file will be
                    # owned by current user
                    return self.copy_to_plugin_step(
                        xaf, plugin_name, step_name, info=True,
                        keep_original_basename=True)
                else:
                    # A single move is ok here
                    return self.move_to_plugin_step(
                        xaf, plugin_name, step_name, info=True,
                        keep_original_basename=True)
            else:
                if fix_uid:
                    # No optmization here
                    return self.copy_to_plugin_step(
                        xaf, plugin_name, step_name, info=True,
                        keep_original_basename=True)
                else:
                    # a single move is ok here
                    return self.move_to_plugin_step(
                        xaf, plugin_name, step_name, info=True,
                        keep_original_basename=True)
        # len(actions) > 1
        can_hardlink = any([x for _, _, x in dest_dirs])
        if fix_uid and can_hardlink:
            xaf = self._fix_uid(xaf)
        result = True
        hardlink_used = False
        for plugin_name, step_name, hardlink in dest_dirs[:-1]:
            if hardlink:
                result = result and \
                    self.hardlink_to_plugin_step(xaf, plugin_name,
                                                 step_name, info=True,
                                                 keep_original_basename=True)
                hardlink_used = True
            else:
                result = result and \
                    self.copy_to_plugin_step(
                        xaf, plugin_name, step_name,
                        info=True, keep_original_basename=True)
        # Special case for last directory (we can optimize a little bit)
        # If there is no error:
        #     If the last directory allow hardlinking => move
        #     If the last directory does not allow hardlinking
        #         If we used hardlinking for other directories => copy
        #         If we didn't use hardlinking for other directories => move
        # If there is some errors:
        #     => copy to keep the original file for trash policy
        plugin_name, step_name, hardlink = dest_dirs[-1]
        if result:
            if hardlink:
                # we can hardlink here, so we can move last one
                result = result and \
                    self.move_to_plugin_step(xaf, plugin_name,
                                             step_name, info=True,
                                             keep_original_basename=True)
            else:
                if hardlink_used:
                    # we have to copy
                    result = result and \
                        self.copy_to_plugin_step(
                            xaf, plugin_name, step_name,
                            info=True, keep_original_basename=True)
                else:
                    # no hardlink used, we can move the last one
                    result = result and \
                        self.move_to_plugin_step(xaf, plugin_name, step_name,
                                                 info=True,
                                                 keep_original_basename=True)
        else:
            # there are some errors, we prefer to copy to keep the original
            # file for trash policy
            result = result and \
                self.copy_to_plugin_step(
                    xaf, plugin_name, step_name,
                    info=True, keep_original_basename=True)
        return result

Ancestors

Subclasses

Methods

def before_copy(self, xaf)

Inherited members

class AcquisitionForkStep

Class to describe a fork acquisition step.

Constructor.

Expand source code
class AcquisitionForkStep(AcquisitionTransformStep):
    """
    Class to describe a fork acquisition step.

    """

    # Just to avoid an invalid parameter exception with
    # command_returning_path=0
    # (this value will never be used)
    dest_dir_default = "FIXME/FIXME"

    def add_extra_arguments(self, parser):
        AcquisitionTransformStep.add_extra_arguments(self, parser)
        parser.add_argument(
            '--command-template', action='store',
            default=None, help='command template to execute ({PATH} string'
            'will be replaced by the file fullpath, '
            '{{MFDATA_CURRENT_PLUGIN_DIR}} by the full plugin dir path')
        parser.add_argument(
            '--command-returning-path', action='store',
            help='if set to 1, we will parse the command output '
            '(on stdout only) '
            'and lines starting with FILEPATH: string will be '
            'interpreted as a full filepath which will be injected in '
            'dest-dir with tags (WARNING: the returned filepath will be '
            'deleted')

    def _init(self):
        AcquisitionTransformStep._init(self)
        self.command_returning_path = \
            self.args.command_returning_path.strip() == "1"
        if self.args.command_template is None or \
                self.args.command_template == "FIXME":
            raise Exception('you have to set a command-template')
        if "{PATH}" not in self.args.command_template:
            raise Exception('{PATH} is not in the command template')
        if self.args.dest_dir == "FIXME/FIXME" and \
                self.command_returning_path:
            raise Exception("command_returning_path == 1 => you have to set "
                            "a valid dest_dir")

    def get_command(self, filepath):
        return self.args.command_template\
            .replace('{PATH}', filepath)\
            .replace('{{MFDATA_CURRENT_PLUGIN_DIR}}',
                     self.get_plugin_directory_path())

    def transform(self, xaf):
        cmd = self.get_command(xaf.filepath)
        self.info("Calling %s ...", cmd)
        x = BashWrapper(cmd)
        if not x:
            self.warning("%s returned a bad return code: %i, details: %s",
                         cmd, x.code, x)
            return
        else:
            self.debug("%s returned a good return code, output: %s",
                       cmd, x)
        if self.command_returning_path:
            paths = []
            lines = [tmp.strip() for tmp in x.stdout.split("\n")]
            for line in lines:
                if line.startswith("FILEPATH:"):
                    path = line[len("FILEPATH:"):].strip()
                    if not path.startswith('/'):
                        self.warning("returned path: %s does not start with / "
                                     "=> ignoring" % path)
                        return
                    if not os.path.exists(path):
                        self.warning("returned path: %s does not exist "
                                     "=> ignoring" % path)
                        return
                    self.debug("returned path = %s" % path)
                    paths.append(path)
            return paths

Ancestors

Class variables

var dest_dir_default

Methods

def get_command(self, filepath)
def transform(self, xaf)

Inherited members

class AcquisitionListener

Abstract class to describe an acquisition listener.

You have to override this class.

Constructor.

Expand source code
class AcquisitionListener(AcquisitionBase):
    """Abstract class to describe an acquisition listener.

    You have to override this class.

    """

    def listen(self):
        """Listen to something.

        You have to override this method.

        When this method returns, the execution is stopped.

        """
        raise NotImplementedError("listen() method must be overriden")

    def run(self):
        self._init()
        self.listen()
        self._destroy()

Ancestors

Methods

def listen(self)

Listen to something.

You have to override this method.

When this method returns, the execution is stopped.

def run(self)

Inherited members

class AcquisitionMoveStep

Abstract class to describe an acquisition step.

You have to override this class.

Attributes

stop_flag : boolean
if True, stop the daemon as soon as possible.
debug_mode_allowed : boolean
if True, the debug mode is allowed.
step_limit : int
maximum step number (to avoid some loops).

Constructor.

Expand source code
class AcquisitionMoveStep(AcquisitionStep):

    def _init(self):
        AcquisitionStep._init(self)
        if self.args.dest_dir == "FIXME":
            raise Exception("dest_dir is not set")
        if '/' not in self.args.dest_dir:
            raise Exception("invalid dest_dir: %s" % self.args.dest_dir)
        if self.args.dest_dir.startswith('/'):
            self.dest_dir = self.args.dest_dir
        else:
            plugin_name = self.args.dest_dir.split('/')[0]
            step_name = self.args.dest_dir.split('/')[1]
            self.dest_dir = get_plugin_step_directory_path(plugin_name,
                                                           step_name)
        mkdir_p_or_die(self.dest_dir)
        self.add_virtual_trace(self.dest_dir)
        try:
            if self.args.drop_tags == "AUTO":
                self.drop_tags = self.args.dest_dir.startswith('/')
            elif self.args.drop_tags == "1":
                self.drop_tags = True
            elif self.args.drop_tags == "0":
                self.drop_tags = False
            else:
                raise Exception("invalid value for drop_tags configuration "
                                "key: %s" % self.args.drop_tags)
        except AttributeError:
            self.drop_tags = True
        try:
            if self.args.force_chmod == "null":
                self.force_chmod = ""
            else:
                self.force_chmod = self.args.force_chmod
        except AttributeError:
            self.force_chmod = ""
        if self.args.dest_basename == "":
            raise Exception("dest_basename can't be empty")
        self.dest_basename = self.args.dest_basename

    def add_dest_arguments(self, parser):
        parser.add_argument('--dest-dir', action='store',
                            default="FIXME",
                            help='destination directory (can be an absolute '
                            'path or something like "plugin_name/step_name")')
        parser.add_argument('--dest-basename', action='store',
                            default="{ORIGINAL_UID}",
                            help='target basename (under dest_dir)')

    def add_force_chmod_argument(self, parser):
        parser.add_argument('--force-chmod', action='store',
                            default="",
                            help='if non empty, force chmod on files after '
                            'move with well known octal value '
                            '(example : 0700)')

    def add_extra_arguments(self, parser):
        self.add_dest_arguments(parser)
        parser.add_argument('--drop-tags', action='store',
                            default="AUTO",
                            help='1 (yes), 0 (no) or '
                            'AUTO (yes for absolute dest-dir)')
        self.add_force_chmod_argument(parser)

    def compute_basename(self, xaf):
        step_counter = self._get_counter_tag_value(xaf, not_found_value='999')
        replaces = {
            "{RANDOM_ID}": get_unique_hexa_identifier(),
            "{ORIGINAL_BASENAME}": self.get_original_basename(xaf),
            "{ORIGINAL_DIRNAME}": self.get_original_dirname(xaf),
            "{ORIGINAL_UID}": self.get_original_uid(xaf),
            "{STEP_COUNTER}": str(step_counter)
        }
        tmp = self.dest_basename
        for to_replace, replaced in replaces.items():
            tmp = tmp.replace(to_replace, replaced)
        if '%' in tmp:
            return datetime.datetime.now().strftime(tmp)
        else:
            return tmp

    def before_move(self, xaf):
        new_filepath = os.path.join(self.dest_dir,
                                    self.compute_basename(xaf))
        return new_filepath

    def process(self, xaf):
        self.info("Processing file: %s" % xaf.filepath)
        try:
            new_filepath = self.before_move(xaf)
            if new_filepath is None or new_filepath is False:
                self.debug("before_move() returned None or False "
                           "=> we do nothing more")
                return True
        except Exception:
            self.exception("exception during before_move() => failure")
            return False
        fcmi = int(self.force_chmod, 8) \
            if self.force_chmod != "" else None
        # Store old xaf filepath to display in the logs
        old_filepath = xaf.filepath
        if self.drop_tags:
            xaf.clear_tags()
        success, moved = xaf.move_or_copy(new_filepath,
                                          chmod_mode_int=fcmi)
        if success:
            if moved:
                self.info("%s moved into %s", old_filepath, new_filepath)
            else:
                self.info("%s copied into %s", xaf.filepath, new_filepath)
            return True
        else:
            self.warning("Can't move/copy %s to %s", xaf.filepath,
                         new_filepath)
            return False

Ancestors

Subclasses

Methods

def add_dest_arguments(self, parser)
def add_force_chmod_argument(self, parser)
def before_move(self, xaf)
def compute_basename(self, xaf)

Inherited members

class AcquisitionReinjectStep

Class to describe a re-inject acquisition step.

Constructor.

Expand source code
class AcquisitionReinjectStep(AcquisitionStep):
    """
    Class to describe a re-inject acquisition step.
    """

    debug_mode_allowed = False

    def add_extra_arguments(self, parser):
        parser.add_argument('--dest-dir', action='store',
                            default="FIXME",
                            help='destination directory (can be an absolute '
                            'path or something like "plugin_name/step_name")')
        parser.add_argument('--retry-min-wait', action='store',
                            type=float, default=10)
        parser.add_argument('--retry-max-wait', action='store',
                            type=float, default=120)
        parser.add_argument('--retry-total', action='store',
                            type=int, default=10)
        parser.add_argument('--retry-backoff', action='store',
                            type=float, default=0)

    def _init(self):
        AcquisitionStep._init(self)
        self.__xafs = {}
        if self.args.dest_dir is None:
            raise Exception('you have to set a dest-dir argument')
        if self.args.dest_dir == "FIXME":
            raise Exception('you have to set a dest-dir argument')
        if '/' not in self.args.dest_dir:
            raise Exception("invalid dest_dir: %s" % self.args.dest_dir)
        if self.args.dest_dir.startswith('/'):
            raise Exception("invalid dest_dir: %s, must be something like: "
                            "plugin_name/step_name" % self.args.dest_dir)
        plugin_name = self.args.dest_dir.split('/')[0]
        step_name = self.args.dest_dir.split('/')[1]
        self.add_virtual_trace(plugin_name, step_name)
        self.add_virtual_trace(">delete", "giveup")
        self.dest_dir = get_plugin_step_directory_path(plugin_name,
                                                       step_name)
        mkdir_p_or_die(self.dest_dir)
        self.retry_total = self.args.retry_total
        self.retry_min_wait = self.args.retry_min_wait
        self.retry_max_wait = self.args.retry_max_wait
        self.retry_backoff = self.args.retry_backoff

    def destroy(self):
        self.debug("destroy called")
        # we generate a IN_MOVE event on remaining files to generate new
        # events on redis (to be sure that remaining files will be checked
        # at startup)
        filepaths = list(self.__xafs.keys())
        for filepath in filepaths:
            try:
                xaf = self.__xafs[filepath][2]
            except KeyError:
                pass
            new_filepath = filepath + ".t"
            self.debug("move %s to %s" % (filepath, new_filepath))
            xaf.rename(new_filepath)
            xaf2 = XattrFile(new_filepath)
            self.debug("move %s to %s" % (new_filepath, filepath))
            xaf2.rename(filepath)

    def _before(self, xaf, **kwargs):
        if self.retry_total <= 0:
            self.info("retry_total <= 0 => let's delete the file")
            xaf.delete_or_nothing()
            return False
        if xaf.filepath not in self.__xafs:
            self._set_before_tags(xaf)
            retry_attempt = int(self.get_tag(xaf, "attempt", "0"))
            self.__xafs[xaf.filepath] = (time.time(), retry_attempt, xaf)
        return False

    def delay(self, attempt_number):
        tmp = self.retry_min_wait + \
            self.retry_backoff * (2 ** (attempt_number - 1))
        if tmp > self.retry_max_wait:
            return self.retry_max_wait
        return tmp

    def reinject(self, xaf, retry_attempt):
        self._set_tag_latest(xaf, "attempt", str(retry_attempt + 1))
        filepath = xaf.filepath
        self.info("reinjecting %s into %s/... attempt %d", filepath,
                  self.dest_dir, retry_attempt + 1)
        new_filepath = os.path.join(self.dest_dir,
                                    get_unique_hexa_identifier())
        xaf.move_or_copy(new_filepath)
        self.add_trace(xaf, self.dest_dir)
        self.get_stats_client().incr("number_of_processed_files", 1)
        self.get_stats_client().incr("bytes_of_processed_files", xaf.getsize())

    def give_up(self, xaf):
        self.warning("max retry attempt for %s => invoking trash policy",
                     xaf.filepath)
        self.get_stats_client().incr("number_of_processing_errors", 1)
        self._trash(xaf)

    def ping(self):
        now = time.time()
        filepaths = list(self.__xafs.keys())
        for filepath in filepaths:
            try:
                if not os.path.exists(filepath):
                    del self.__xafs[filepath]
                mtime, retry_attempt, xaf = self.__xafs[filepath]
                if retry_attempt >= self.retry_total:
                    del self.__xafs[filepath]
                    self.give_up(xaf)
                delay = self.delay(retry_attempt + 1)
                if now - mtime >= delay:
                    self.reinject(xaf, retry_attempt)
                    del self.__xafs[filepath]
            except KeyError:
                pass

Ancestors

Class variables

var debug_mode_allowed

Methods

def delay(self, attempt_number)
def give_up(self, xaf)
def reinject(self, xaf, retry_attempt)

Inherited members

class AcquisitionStatsDClient (plugin_name, step_name, extra_tags={})
Expand source code
class AcquisitionStatsDClient(object):

    plugin_name = None
    step_name = None
    __suffix_cache = None

    def __init__(self, plugin_name, step_name, extra_tags={}):
        self.plugin_name = plugin_name
        self.step_name = step_name
        self.extra_tags = extra_tags

    def gauge(self, stat, value):
        raise NotImplementedError()

    def set(self, stat, value):
        raise NotImplementedError()

    def timing(self, stat, delta):
        raise NotImplementedError()

    def incr(self, stat, count):
        raise NotImplementedError()

    def decr(self, stat, count):
        raise NotImplementedError()

    def timer(self, stat):
        raise NotImplementedError()

    def pipeline(self):
        raise NotImplementedError()

    def _get_prefix(self):
        return ""

    def _get_suffix(self):
        if self.__suffix_cache is None:
            tmp = ["", "module=%s" % MFMODULE.lower(), "host=%s" % HOSTNAME]
            if self.plugin_name is not None:
                tmp.append("plugin=%s" % self.plugin_name)
            if self.step_name is not None:
                tmp.append("step=%s" % self.step_name)
            for k, v in self.extra_tags.items():
                tmp.append("%s=%s" % (k, v))
            self.__suffix_cache = ",".join(tmp)
        return self.__suffix_cache

    def _stat(self, stat):
        return "%s%s%s" % (self._get_prefix(), stat, self._get_suffix())

Subclasses

Class variables

var plugin_name
var step_name

Methods

def decr(self, stat, count)
def gauge(self, stat, value)
def incr(self, stat, count)
def pipeline(self)
def set(self, stat, value)
def timer(self, stat)
def timing(self, stat, delta)
class AcquisitionStep

Abstract class to describe an acquisition step.

You have to override this class.

Attributes

stop_flag : boolean
if True, stop the daemon as soon as possible.
debug_mode_allowed : boolean
if True, the debug mode is allowed.
step_limit : int
maximum step number (to avoid some loops).

Constructor.

Expand source code
class AcquisitionStep(AcquisitionBase):
    """Abstract class to describe an acquisition step.

    You have to override this class.

    Attributes:
        stop_flag (boolean): if True, stop the daemon as soon as possible.
        debug_mode_allowed (boolean): if True, the debug mode is allowed.
        step_limit (int): maximum step number (to avoid some loops).

    """

    stop_flag = False
    debug_mode_allowed = True
    unit_tests = False
    unit_tests_args = None
    failure_policy = None
    step_limit = DEFAULT_STEP_LIMIT
    __last_ping = None
    _debug_mode = False

    def _init(self):
        super(AcquisitionStep, self)._init()
        self.failure_policy = self.args.failure_policy
        self.failure_policy_move_dest_dir = None
        self.failure_policy_move_absolute_dir = False
        if self.failure_policy not in ("keep", "delete", "move"):
            self.error_and_die(
                "unknown failure policy: %s", self.failure_policy
            )
        if self.failure_policy == "move":
            fpmdd = self.args.failure_policy_move_dest_dir
            if fpmdd is None or fpmdd == "" or fpmdd == "FIXME":
                self.error_and_die(
                    "you have to set a "
                    "failure-policy-move-dest-dir"
                    " in case of move failure policy"
                )
            if "/" not in fpmdd:
                self.error_and_die(
                    "failure-policy-move-dest-dir must be something like "
                    "plugin_name/step_name or an absolute path")
            if fpmdd.startswith('/'):
                # absolute dir
                mkdir_p_or_die(fpmdd)
                self.add_virtual_trace(fpmdd)
                self.failure_policy_move_dest_dir = fpmdd
                self.failure_policy_move_absolute_dir = True
            else:
                # plugin_name/step_name
                plugin_name = fpmdd.split('/')[0]
                step_name = fpmdd.split('/')[1]
                self.add_virtual_trace(plugin_name, step_name)
                self.failure_policy_move_dest_dir = \
                    get_plugin_step_directory_path(plugin_name, step_name)
                mkdir_p_or_die(self.failure_policy_move_dest_dir)
        signal.signal(signal.SIGTERM, self.__sigterm_handler)

    def _add_extra_arguments_before(self, parser):
        parser.add_argument(
            "--redis-host",
            action="store",
            default="127.0.0.1",
            help="redis host to connect to (in daemon mode)",
        )
        parser.add_argument(
            "--redis-port",
            action="store",
            default=6379,
            help="redis port to connect to (in daemon mode)",
        )
        parser.add_argument(
            "--redis-unix-socket-path",
            action="store",
            default=None,
            help="path to redis unix socket (overrides "
            "redis-host and redis-port if set)",
        )
        parser.add_argument(
            "--failure-policy",
            action="store",
            default="keep",
            help="failure policy (keep, delete or move)",
        )
        parser.add_argument(
            "--failure-policy-move-dest-dir",
            action="store",
            default=None,
            help="dest-dir in case of move failure policy (must be something "
            "like plugin_name/step_name or an absolute dir path)",
        )

    def _add_extra_arguments_after(self, parser):
        parser.add_argument(
            "FULL_FILEPATH_OR_QUEUE_NAME",
            action="store",
            help="if starts with /, we consider we are "
            "in debug mode, if not, we consider we are in "
            "daemon mode",
        )

    def _exception_safe_call(
        self, func, args, kwargs, label, return_value_if_exception
    ):
        try:
            return func(*args, **kwargs)
        except Exception:
            self.exception("exception during %s" % label)
            return return_value_if_exception

    def _process(self, xaf, force_original_filepath=None):
        if force_original_filepath is not None:
            xaf._original_filepath = force_original_filepath
        else:
            xaf._original_filepath = xaf.filepath
        self.info("Start the processing of %s...", xaf._original_filepath)
        timer = self.get_stats_client().timer("processing_file_timer")
        timer.start()
        before_status = self._before(xaf)
        if before_status is False:
            timer.stop(send=False)
            self.info(
                "End of the %s processing after %i ms "
                "(before_status=False)",
                xaf._original_filepath,
                timer.ms,
            )
            return False
        size = xaf.getsize()
        process_status = self._exception_safe_call(
            self.process,
            [xaf],
            {},
            "process of %s" % xaf._original_filepath,
            False,
        )
        after_status = self._after(xaf, process_status)
        self.get_stats_client().incr("number_of_processed_files", 1)
        self.get_stats_client().incr("bytes_of_processed_files", size)
        if not after_status:
            self.get_stats_client().incr("number_of_processing_errors", 1)
        timer.stop()
        self.info(
            "End of the %s processing after %i ms",
            xaf._original_filepath,
            timer.ms,
        )
        if not after_status:
            self.warning(
                "Bad processing status for file: %s", xaf._original_filepath
            )
            logger = self._get_logger()
            if logger.isEnabledFor(10):  # DEBUG
                xaf.dump_tags_on_logger(logger, 10)  # DEBUG
        return after_status

    def _before(self, xaf):
        if "first.core.original_uid" in xaf.tags:
            forced_uid = \
                xaf.tags['first.core.original_uid'].decode('utf8')
            tmp_filepath = self.get_tmp_filepath(
                forced_basename=forced_uid)
        else:
            tmp_filepath = self.get_tmp_filepath()
            forced_uid = os.path.basename(tmp_filepath)
        self.info("Move %s to %s (to process it)", xaf.filepath, tmp_filepath)
        try:
            xaf.rename(tmp_filepath)
        except (IOError, OSError):
            self.debug(
                "Can't move %s to %s: file does not " "exist any more ?",
                xaf.filepath,
                tmp_filepath,
            )
            return False
        self._set_original_uid_if_necessary(xaf, forced_uid=forced_uid)
        xaf._before_process_filepath = xaf.filepath
        self._set_before_tags(xaf)
        if self._get_counter_tag_value(xaf) > self.step_limit:
            self.warning(
                "step_value bigger than step_limit [%i] => "
                "deleting file to avoid a recursion loop ?" % self.step_limit
            )
            return False
        return True

    def _trash(self, xaf):
        if self.failure_policy == "delete":
            xaf.delete_or_nothing()
            self.add_trace(xaf, ">trash", "delete")
        elif self.failure_policy == "keep":
            self.add_trace(xaf, ">trash", "keep")
            new_filepath = os.path.join(
                _get_or_make_trash_dir(self.plugin_name, self.step_name),
                xaf.basename(),
            )
            (success, move) = xaf.move_or_copy(new_filepath)
            if success:
                xaf.write_tags_in_a_file(new_filepath + ".tags")
                xattrfile.XattrFile(new_filepath).clear_tags()
            else:
                xaf.delete_or_nothing()
        elif self.failure_policy == "move":
            new_filepath = os.path.join(
                self.failure_policy_move_dest_dir, xaf.basename()
            )
            (success, move) = xaf.move_or_copy(new_filepath)
            if not success:
                xaf.delete_or_nothing()
            else:
                if self.failure_policy_move_absolute_dir:
                    xaf.write_tags_in_a_file(new_filepath + ".tags")
                    xattrfile.XattrFile(new_filepath).clear_tags()
                self.add_trace(xaf, self.failure_policy_move_dest_dir)

    def _after(self, xaf, process_status):
        if process_status:
            if hasattr(xaf, "_before_process_filepath"):
                if xaf._before_process_filepath:
                    if xaf._before_process_filepath == xaf.filepath:
                        xaf.delete_or_nothing()
        else:
            self._trash(xaf)
        self._exception_safe_call(
            self.after, [process_status], {}, "after %s" % xaf.filepath, False
        )
        return process_status

    def _ping(self):
        self.__last_ping = datetime.now(timezone.utc).replace(tzinfo=None)
        self._exception_safe_call(self.ping, [], {}, "ping", False)

    def __ping_if_needed(self):
        if self.stop_flag:
            return
        if self.__last_ping is None:
            return self._ping()
        delta = (datetime.now(timezone.utc).replace(tzinfo=None) -
                 self.__last_ping).total_seconds()
        if delta > 1:
            self._ping()

    def __run_in_daemon_mode(
        self, redis_host, redis_port, redis_queue, redis_unix_socket_path=None
    ):
        self.info("Start the daemon mode with redis_queue=%s", redis_queue)
        if redis_unix_socket_path:
            r = redis.StrictRedis(unix_socket_path=redis_unix_socket_path)
        else:
            r = redis.StrictRedis(host=redis_host, port=redis_port)
        redis_connection_exceptions_counter = 0
        while not self.stop_flag:
            self.__ping_if_needed()
            try:
                msg = r.brpop(redis_queue, 1)
            except KeyboardInterrupt:
                self.stop_flag = True
                continue
            except redis.exceptions.ConnectionError:
                redis_connection_exceptions_counter = (
                    redis_connection_exceptions_counter + 1
                )
                if redis_connection_exceptions_counter >= 10:
                    self.critical("Can't connect to redis after 10s => exit")
                    self.stop_flag = True
                    continue
                else:
                    self.debug(
                        "Can't connect to redis => "
                        "I will try again after 1s sleep"
                    )
                    time.sleep(1)
                    continue
            redis_connection_exceptions_counter = 0
            if msg is None:
                # brpop timeout
                continue
            try:
                decoded_msg = json.loads(msg[1].decode("utf8"))
                filepath = os.path.join(
                    decoded_msg["directory"], decoded_msg["filename"]
                )
            except Exception:
                self.warning(
                    "wrong message type popped on bus "
                    "=> stopping to force a restart"
                )
                self.stop_flag = True
                continue
            if not os.path.exists(filepath):
                self.debug(
                    "filepath: %s does not exist anymore " "=> ignoring event",
                    filepath,
                )
                continue
            xaf = xattrfile.XattrFile(filepath)
            counter = "counter.%s.%s" % (self.plugin_name, self.step_name)
            latest = "latest.%s.%s" % (self.plugin_name, self.step_name)
            try:
                r.incr(counter)
            except Exception:
                self.warning("can't update redis counter: %s" % counter)
            try:
                r.set(latest, str(get_utc_unix_timestamp()))
            except Exception:
                self.warning("can't set latest timestamp: %s" % latest)
            self._process(xaf)
        self.debug("Stop to  brpop queue %s" % redis_queue)

    def __run_in_debug_mode(self, filepath):
        self.info("Start the debug mode with filepath=%s", filepath)
        self._get_logger().setLevel(0)
        self._debug_mode = True
        self._ping()
        tmp_filepath = self.get_tmp_filepath()
        original_xaf = xattrfile.XattrFile(filepath)
        xaf = original_xaf.copy(tmp_filepath)
        return self._process(xaf, force_original_filepath=filepath)

    def get_stats_client(self, extra_tags={}):
        return get_stats_client(self.plugin_name, self.step_name, extra_tags)

    def get_plugin_directory_path(self):
        """Return the plugin directory (fullpath).

        Returns:
            (string) the fullpath of the plugin directory.

        """
        return os.path.join(
            MFMODULE_RUNTIME_HOME, "var", "plugins", self.plugin_name
        )

    def __xxx_to_plugin_step_get_basename(self, xaf,
                                          keep_original_basename):
        if keep_original_basename is None:
            if "first.core.original_uid" in xaf.tags:
                return xaf.tags["first.core.original_uid"].decode('utf8')
            else:
                keep_original_basename = False
        if keep_original_basename is True:
            return xaf.basename()
        elif keep_original_basename is False:
            return get_unique_hexa_identifier()
        else:
            raise Exception("invalid value for keep_original_basename: %s "
                            "(must be True, False or None)",
                            keep_original_basename)

    def hardlink_to_plugin_step(self, xaf, plugin_name, step_name,
                                keep_original_basename=False,
                                info=False):
        """Hardlink (or copy) a XattrFile to another plugin/step.

        Args:
            xaf (XattrFile): XattrFile to move.
            plugin_name (string): plugin name.
            step_name (string): step name.
            keep_original_basename (boolean): if True, we keep the original
                basename of xaf. If False, we generate a new basename, If None,
                we use the value of first.core.original_uid tag as basename
                (if exists).
            info (boolean): if true, add INFO log messages.

        Returns:
            boolean: True if ok.

        """
        basename = \
            self.__xxx_to_plugin_step_get_basename(xaf, keep_original_basename)
        old_filepath = xaf.filepath
        target_path = os.path.join(
            get_plugin_step_directory_path(plugin_name, step_name),
            basename,
        )
        result, hardlinked = xaf.hardlink_or_copy(target_path)
        if info and result:
            if hardlinked:
                self.info("File %s hardlinked to %s" %
                          (old_filepath, target_path))
            else:
                self.info("File %s copied to %s (can't hardlink)" %
                          (old_filepath, target_path))
        if result:
            self.add_trace(xaf, plugin_name, step_name)
        return result

    def move_to_plugin_step(self, xaf, plugin_name, step_name,
                            keep_original_basename=False,
                            info=False):
        """Move (or copy) a XattrFile to another plugin/step.

        Args:
            xaf (XattrFile): XattrFile to move.
            plugin_name (string): plugin name.
            step_name (string): step name.
            keep_original_basename (boolean): if True, we keep the original
                basename of xaf. If False, we generate a new basename, If None,
                we use the value of first.core.original_uid tag as basename
                (if exists).
            info (boolean): if true, add INFO log messages.

        Returns:
            boolean: True if ok.

        """
        basename = \
            self.__xxx_to_plugin_step_get_basename(xaf, keep_original_basename)
        old_filepath = xaf.filepath
        target_path = os.path.join(
            get_plugin_step_directory_path(plugin_name, step_name),
            basename,
        )
        result, moved = xaf.move_or_copy(target_path)
        if info and result:
            if moved:
                self.info("File %s moved to %s" %
                          (old_filepath, target_path))
            else:
                self.info("File %s copied to %s (can't move)" %
                          (old_filepath, target_path))
        if result:
            self.add_trace(xaf, plugin_name, step_name)
        return result

    def copy_to_plugin_step(self, xaf, plugin_name, step_name,
                            keep_original_basename=False, info=False):
        """Copy a XattrFile (with tags) to another plugin/step.

        Args:
            xaf (XattrFile): XattrFile to move.
            plugin_name (string): plugin name.
            step_name (string): step name.
            keep_original_basename (boolean): if True, we keep the original
                basename of xaf. If False, we generate a new basename, If None,
                we use the value of first.core.original_uid tag as basename
                (if exists).
            info (boolean): if true, add INFO log messages.

        Returns:
            boolean: True if ok

        """
        basename = \
            self.__xxx_to_plugin_step_get_basename(xaf, keep_original_basename)
        old_filepath = xaf.filepath
        target_path = os.path.join(
            get_plugin_step_directory_path(plugin_name, step_name),
            basename,
        )
        result = xaf.copy_or_nothing(target_path)
        if info and result:
            self.info("File %s copied to %s" % (old_filepath, target_path))
        if result:
            self.add_trace(xaf, plugin_name, step_name)
        return result

    def process(self, xaf):
        """Process one file.

        Process one XattrFile. You have to override this method (unless your
        class inherits from BatchStep, see batch_process() in that case).

        The file is moved into a temporary directory before the call with
        an unique filename. Extended attributes are copied to it.
        So you can do what you want with it.

        If the method returns True:

        - we considerer that the processing is ok
        - the file is delete if necessary

        If the method doesn't return True:

        - we considerer that the processing is not ok (a warning message is
            logged).
        - the file is given to the choosen failure policy.

        Args:
            xaf : XattrFile object.

        Returns:
            boolean: processing status (True: ok, False: not ok)

        """
        raise NotImplementedError("process() method must be overriden")

    def after(self, status):
        """Method called after the process execution.

        It's called event in case of exceptions during process.

        """
        return

    def ping(self):
        """Do something every second if possible.

        The call can be blocked by a long running process() call.

        """
        return

    def add_extra_arguments(self, parser):
        """Add some extra argument to commande line parsing.

        If you have to add some, you have to override this method.

        Args:
            parser: an ArgumentParser object (with default options added).

        """
        pass

    def run(self):
        self._init()
        if self.args.FULL_FILEPATH_OR_QUEUE_NAME.startswith("/"):
            if not self.debug_mode_allowed:
                self.error_and_die("debug mode is not allowed for this step")
            self.__run_in_debug_mode(self.args.FULL_FILEPATH_OR_QUEUE_NAME)
        else:
            self.__run_in_daemon_mode(
                self.args.redis_host,
                self.args.redis_port,
                self.args.FULL_FILEPATH_OR_QUEUE_NAME,
                self.args.redis_unix_socket_path,
            )
        self._destroy()

    def get_original_basename(self, xaf):
        """Return the original basename of the file.

        The original basename is the not modified basename before step0
        execution. "unknown" is returned if we can't find the basename.

        Args:
            xaf (XattrFile): file to consider.

        Returns:
            string: original basename.

        """
        tag_name = self._get_original_basename_tag_name()
        return xaf.tags.get(tag_name, b"unknown").decode("utf8")

    def get_original_uid(self, xaf):
        """Return the original uid of the file.

        The original uid is the first unique id given before step0 execution.
        "unknown" is returned if we can't find the original uid.

        Args:
            xaf (XattrFile): file to consider.

        Returns:
            string: original uid.

        """
        tag_name = self._get_original_uid_tag_name()
        return xaf.tags.get(tag_name, b"unknown").decode("utf8")

    def get_original_dirname(self, xaf):
        """Return the original dirname of the file.

        The original dirname is the not modified basename before step0
        execution. "unknown" is returned if we can't find the dirname.

        Args:
            xaf (XattrFile): file to consider.

        Returns:
            string: original dirname.

        """
        tag_name = self._get_original_dirname_tag_name()
        return xaf.tags.get(tag_name, b"unknown").decode("utf8")

    def __sigterm_handler(self, *args):
        self.debug("SIGTERM signal handled => schedulling shutdown")
        self.stop_flag = True

Ancestors

Subclasses

Class variables

var debug_mode_allowed
var failure_policy
var step_limit
var stop_flag
var unit_tests
var unit_tests_args

Methods

def add_extra_arguments(self, parser)

Add some extra argument to commande line parsing.

If you have to add some, you have to override this method.

Args

parser
an ArgumentParser object (with default options added).
def after(self, status)

Method called after the process execution.

It's called event in case of exceptions during process.

def copy_to_plugin_step(self, xaf, plugin_name, step_name, keep_original_basename=False, info=False)

Copy a XattrFile (with tags) to another plugin/step.

Args

xaf : XattrFile
XattrFile to move.
plugin_name : string
plugin name.
step_name : string
step name.
keep_original_basename : boolean
if True, we keep the original basename of xaf. If False, we generate a new basename, If None, we use the value of first.core.original_uid tag as basename (if exists).
info : boolean
if true, add INFO log messages.

Returns

boolean
True if ok
def get_original_basename(self, xaf)

Return the original basename of the file.

The original basename is the not modified basename before step0 execution. "unknown" is returned if we can't find the basename.

Args

xaf : XattrFile
file to consider.

Returns

string
original basename.
def get_original_dirname(self, xaf)

Return the original dirname of the file.

The original dirname is the not modified basename before step0 execution. "unknown" is returned if we can't find the dirname.

Args

xaf : XattrFile
file to consider.

Returns

string
original dirname.
def get_original_uid(self, xaf)

Return the original uid of the file.

The original uid is the first unique id given before step0 execution. "unknown" is returned if we can't find the original uid.

Args

xaf : XattrFile
file to consider.

Returns

string
original uid.
def get_stats_client(self, extra_tags={})

Hardlink (or copy) a XattrFile to another plugin/step.

Args

xaf : XattrFile
XattrFile to move.
plugin_name : string
plugin name.
step_name : string
step name.
keep_original_basename : boolean
if True, we keep the original basename of xaf. If False, we generate a new basename, If None, we use the value of first.core.original_uid tag as basename (if exists).
info : boolean
if true, add INFO log messages.

Returns

boolean
True if ok.
def move_to_plugin_step(self, xaf, plugin_name, step_name, keep_original_basename=False, info=False)

Move (or copy) a XattrFile to another plugin/step.

Args

xaf : XattrFile
XattrFile to move.
plugin_name : string
plugin name.
step_name : string
step name.
keep_original_basename : boolean
if True, we keep the original basename of xaf. If False, we generate a new basename, If None, we use the value of first.core.original_uid tag as basename (if exists).
info : boolean
if true, add INFO log messages.

Returns

boolean
True if ok.
def ping(self)

Do something every second if possible.

The call can be blocked by a long running process() call.

def process(self, xaf)

Process one file.

Process one XattrFile. You have to override this method (unless your class inherits from BatchStep, see batch_process() in that case).

The file is moved into a temporary directory before the call with an unique filename. Extended attributes are copied to it. So you can do what you want with it.

If the method returns True:

  • we considerer that the processing is ok
  • the file is delete if necessary

If the method doesn't return True:

  • we considerer that the processing is not ok (a warning message is logged).
  • the file is given to the choosen failure policy.

Args

xaf : XattrFile object.

Returns

boolean
processing status (True: ok, False: not ok)
def run(self)

Inherited members

class AcquisitionTransformStep

Abstract class to describe an acquisition step.

You have to override this class.

Attributes

stop_flag : boolean
if True, stop the daemon as soon as possible.
debug_mode_allowed : boolean
if True, the debug mode is allowed.
step_limit : int
maximum step number (to avoid some loops).

Constructor.

Expand source code
class AcquisitionTransformStep(AcquisitionStep):

    dest_dir_default = "FIXME"

    def _init(self):
        AcquisitionStep._init(self)
        if self.args.dest_dir.startswith('/'):
            raise Exception("invalid dest_dir: %s, must be something like "
                            "plugin_name/step_name" % self.args.dest_dir)
        if '/' not in self.args.dest_dir:
            raise Exception("invalid dest_dir: %s" % self.args.dest_dir)
        tmp = self.args.dest_dir.split('/')
        if len(tmp) != 2:
            raise Exception("invalid dest_dir: %s" % self.args.dest_dir)
        self.dest_plugin_name = tmp[0]
        self.dest_step_name = tmp[1]
        self.add_virtual_trace(self.dest_plugin_name, self.dest_step_name)
        self.keep_transformed_basename = True

    def add_extra_arguments(self, parser):
        parser.add_argument('--dest-dir', action='store',
                            default=self.dest_dir_default,
                            help='destination directory (something like '
                            '"plugin_name/step_name")')

    def transform(self, xaf):
        return xaf

    def process_out(self, xaf, out):
        if out is None:
            return True
        if isinstance(out, XattrFile) or isinstance(out, str):
            if isinstance(out, XattrFile):
                if "first.core.original_basename" in out.tags:
                    # we consider that original tags are already copied
                    new_xaf = out
                else:
                    new_xaf = xaf.copy_tags_on(out.filepath)
            else:
                # out is str
                new_xaf = xaf.copy_tags_on(out)
            return self.move_to_plugin_step(
                new_xaf, self.dest_plugin_name, self.dest_step_name,
                keep_original_basename=self.keep_transformed_basename,
                info=True)
        elif isinstance(out, Iterable):
            # let's iterate
            return all([self.process_out(xaf, x) for x in out])
        else:
            raise Exception("wrong output type: %s for transform() call" %
                            type(out).__name__)

    def process(self, xaf):
        self.info("Processing file: %s" % xaf.filepath)
        try:
            out = self.transform(xaf)
        except Exception:
            self.exception("exception during transform() call")
            return False
        if out is None:
            self.debug("transform() returned None => we do nothing more")
            return True
        return self.process_out(xaf, out)

Ancestors

Subclasses

Class variables

var dest_dir_default

Methods

def process_out(self, xaf, out)
def transform(self, xaf)

Inherited members