Source code for acquisition.batch_step

import datetime

from acquisition import AcquisitionStep
from mfutil import get_unique_hexa_identifier


class AcquisitionBatch(object):

    id = None
    _start = None
    _xafs = None
    max_size = None
    max_wait = None

    def __init__(self, max_size, max_wait):
        self.id = get_unique_hexa_identifier()
        self._xafs = []
        self.max_size = max_size
        self.max_wait = max_wait

    def append(self, xaf):
        self._xafs.append(xaf)
        if self._start is None:
            self._start = datetime.datetime.utcnow()

    def get_size(self):
        return len(self._xafs)

    def get_age(self):
        if self._start is None:
            return 0
        delta = datetime.datetime.utcnow() - self._start
        return delta.total_seconds()

    def is_ready(self):
        if self.get_size() >= self.max_size:
            return True
        if self.get_age() >= self.max_wait:
            return True
        return False

    def get_xafs(self):
        return self._xafs


[docs]class AcquisitionBatchStep(AcquisitionStep): """Abstract class to describe a batch acquisition step. You have to override this class. Attributes: _batch (AcquisitionBatch): AcquisitionBatch object. """ __batch = None def __init__(self): super(AcquisitionBatchStep, self).__init__() @property def batch_process_max_size(self): """Return the max size of a batch in batch process mode. If not overriden, the default value is 100. """ return 100 @property def batch_process_max_wait(self): """Max wait (in seconds) to fill the batch in batch process mode. If not overriden, the default value is 10 (so 10 seconds). """ return 10 @property def _batch(self): # Lazy init of AcquisitionBatch (to be able to have dynamic # batch_process_max_size/batch_process_max_wait) if self.__batch is None: self.__batch = AcquisitionBatch(self.batch_process_max_size, self.batch_process_max_wait) return self.__batch def _reinit_batch(self): """Reinit the current batch.""" self.__batch = None
[docs] def process(self, xaf): raise NotImplementedError("process() method should not be called in " "batch process mode")
[docs] def batch_process(self, xafs): """Process a batch of files. Process several XattrFile (between 1 and max_batch_size files in one call). You have to override this method. File are moved into a temporary directory before the call with unique filenames. Extended attributes are copied to them. So you can do what you want with them. If the method returns True: - we considerer that the processing is ok - all files are deleted if necessary If the method returns False: - we considerer that the result is False for each file - so each file follow the failure policy If the method returns an array of booleans (of the same size than the xafs array), we consider that the processing status for each file. Args: xafs (list): XattrFile objects (list of files to process). Returns: Processing status (True, False or array of booleans). """ raise NotImplementedError("batch_process() method must be overriden " "in \"batch process mode\"")
def _destroy(self): try: if self._batch.get_size(): # we have a last batch to process # let's process it by force self._conditional_process_batch(force=True) except TypeError: pass super(AcquisitionBatchStep, self)._destroy() def _ping(self, *args, **kwargs): self._conditional_process_batch() super(AcquisitionBatchStep, self)._ping(*args, **kwargs) def _is_batch_ready(self, force=False): if force and self._batch.get_size() > 0: return True return self._batch.is_ready() def _before(self, xaf): status = super(AcquisitionBatchStep, self)._before(xaf) if not status: return False self._batch.append(xaf) self.set_tag(xaf, "batch_id", self._batch.id, add_latest=False) self.info("File %s added in batch %s", xaf._original_filepath, self._batch.id) return False def _conditional_process_batch(self, force=False): if self._is_batch_ready(force or self._debug_mode): self.debug("Batch %s is ready (size: %i, age: %s seconds) => " "let's process it", self._batch.id, self._batch.get_size(), self._batch.get_age()) self._process_batch() self._reinit_batch() else: if self._batch.get_size() > 0: self.debug("Batch %s is not ready (size: %i, age: %s seconds)", self._batch.id, self._batch.get_size(), self._batch.get_age()) def _process(self, xaf): super(AcquisitionBatchStep, self)._process(xaf) self._conditional_process_batch() def _process_batch(self): self.info("Start the processing of batch %s...", self._batch.id) timer = self.get_stats_client().timer("processing_batch_timer") timer.start() xafs = self._batch.get_xafs() size = sum([x.getsize() for x in xafs]) process_status = \ self._exception_safe_call(self.batch_process, [xafs], {}, "batch_process", False) after_status = self._after_batch(xafs, process_status) self.get_stats_client().incr("number_of_processed_files", len(xafs)) self.get_stats_client().incr("bytes_of_processed_files", size) self.get_stats_client().incr("number_of_processed_batches", 1) timer.stop() self.info("End of the processing of batch %s...", self._batch.id) if not after_status: self.get_stats_client().incr("number_of_processed_" "batches_error", 1) self.warning("Bad processing status for batch: %s", self._batch.id) def _after_batch(self, xafs, process_status): if process_status is None: process_status = False if isinstance(process_status, bool): for xaf in xafs: super(AcquisitionBatchStep, self)._after(xaf, process_status) return process_status else: if len(process_status) != len(xafs): self.warning("bad process status len(process_status) = %i " "which is different than len(xafs) = %i" % (len(process_status), len(xafs))) super(AcquisitionBatchStep, self)._after(xaf, False) return False for (xaf, pstatus) in zip(xafs, process_status): super(AcquisitionBatchStep, self)._after(xaf, pstatus) return min(process_status)