Module acquisition.reinject_step

Functions

def main()

Classes

class AcquisitionReinjectStep

Class to describe a re-inject acquisition step.

Constructor.

Expand source code
class AcquisitionReinjectStep(AcquisitionStep):
    """
    Class to describe a re-inject acquisition step.
    """

    debug_mode_allowed = False

    def add_extra_arguments(self, parser):
        parser.add_argument('--dest-dir', action='store',
                            default="FIXME",
                            help='destination directory (can be an absolute '
                            'path or something like "plugin_name/step_name")')
        parser.add_argument('--retry-min-wait', action='store',
                            type=float, default=10)
        parser.add_argument('--retry-max-wait', action='store',
                            type=float, default=120)
        parser.add_argument('--retry-total', action='store',
                            type=int, default=10)
        parser.add_argument('--retry-backoff', action='store',
                            type=float, default=0)

    def _init(self):
        AcquisitionStep._init(self)
        self.__xafs = {}
        if self.args.dest_dir is None:
            raise Exception('you have to set a dest-dir argument')
        if self.args.dest_dir == "FIXME":
            raise Exception('you have to set a dest-dir argument')
        if '/' not in self.args.dest_dir:
            raise Exception("invalid dest_dir: %s" % self.args.dest_dir)
        if self.args.dest_dir.startswith('/'):
            raise Exception("invalid dest_dir: %s, must be something like: "
                            "plugin_name/step_name" % self.args.dest_dir)
        plugin_name = self.args.dest_dir.split('/')[0]
        step_name = self.args.dest_dir.split('/')[1]
        self.add_virtual_trace(plugin_name, step_name)
        self.add_virtual_trace(">delete", "giveup")
        self.dest_dir = get_plugin_step_directory_path(plugin_name,
                                                       step_name)
        mkdir_p_or_die(self.dest_dir)
        self.retry_total = self.args.retry_total
        self.retry_min_wait = self.args.retry_min_wait
        self.retry_max_wait = self.args.retry_max_wait
        self.retry_backoff = self.args.retry_backoff

    def destroy(self):
        self.debug("destroy called")
        # we generate a IN_MOVE event on remaining files to generate new
        # events on redis (to be sure that remaining files will be checked
        # at startup)
        filepaths = list(self.__xafs.keys())
        for filepath in filepaths:
            try:
                xaf = self.__xafs[filepath][2]
            except KeyError:
                pass
            new_filepath = filepath + ".t"
            self.debug("move %s to %s" % (filepath, new_filepath))
            xaf.rename(new_filepath)
            xaf2 = XattrFile(new_filepath)
            self.debug("move %s to %s" % (new_filepath, filepath))
            xaf2.rename(filepath)

    def _before(self, xaf, **kwargs):
        if self.retry_total <= 0:
            self.info("retry_total <= 0 => let's delete the file")
            xaf.delete_or_nothing()
            return False
        if xaf.filepath not in self.__xafs:
            self._set_before_tags(xaf)
            retry_attempt = int(self.get_tag(xaf, "attempt", "0"))
            self.__xafs[xaf.filepath] = (time.time(), retry_attempt, xaf)
        return False

    def delay(self, attempt_number):
        tmp = self.retry_min_wait + \
            self.retry_backoff * (2 ** (attempt_number - 1))
        if tmp > self.retry_max_wait:
            return self.retry_max_wait
        return tmp

    def reinject(self, xaf, retry_attempt):
        self._set_tag_latest(xaf, "attempt", str(retry_attempt + 1))
        filepath = xaf.filepath
        self.info("reinjecting %s into %s/... attempt %d", filepath,
                  self.dest_dir, retry_attempt + 1)
        new_filepath = os.path.join(self.dest_dir,
                                    get_unique_hexa_identifier())
        xaf.move_or_copy(new_filepath)
        self.add_trace(xaf, self.dest_dir)
        self.get_stats_client().incr("number_of_processed_files", 1)
        self.get_stats_client().incr("bytes_of_processed_files", xaf.getsize())

    def give_up(self, xaf):
        self.warning("max retry attempt for %s => invoking trash policy",
                     xaf.filepath)
        self.get_stats_client().incr("number_of_processing_errors", 1)
        self._trash(xaf)

    def ping(self):
        now = time.time()
        filepaths = list(self.__xafs.keys())
        for filepath in filepaths:
            try:
                if not os.path.exists(filepath):
                    del self.__xafs[filepath]
                mtime, retry_attempt, xaf = self.__xafs[filepath]
                if retry_attempt >= self.retry_total:
                    del self.__xafs[filepath]
                    self.give_up(xaf)
                delay = self.delay(retry_attempt + 1)
                if now - mtime >= delay:
                    self.reinject(xaf, retry_attempt)
                    del self.__xafs[filepath]
            except KeyError:
                pass

Ancestors

Class variables

var debug_mode_allowed

Methods

def delay(self, attempt_number)
def give_up(self, xaf)
def reinject(self, xaf, retry_attempt)

Inherited members