from __future__ import print_function
import os
import xattrfile
import redis
import json
import datetime
import time
import signal
from acquisition.base import AcquisitionBase
from mfutil import mkdir_p_or_die, get_unique_hexa_identifier
from mfutil import get_utc_unix_timestamp
from acquisition.utils import (
    get_plugin_step_directory_path,
    MFMODULE_RUNTIME_HOME,
    _get_or_make_trash_dir,
)
from acquisition.stats import get_stats_client
DEFAULT_STEP_LIMIT = 1000
DEFAULT_REDIS_HOST = "127.0.0.1"
DEFAULT_REDIS_PORT = int(os.environ.get("MFDATA_REDIS_PORT", "1234"))
[docs]class AcquisitionStep(AcquisitionBase):
    """Abstract class to describe an acquisition step.
    You have to override this class.
    Attributes:
        stop_flag (boolean): if True, stop the daemon as soon as possible.
        debug_mode_allowed (boolean): if True, the debug mode is allowed.
        failure_policy (string): failure policy ("move", "delete" or "keep").
        failure_policy_move_dest_dir (string): destination directory when
            failure policy is move.
        failure_policy_move_keep_tags (boolean): keep tags into another file
            when failure policy is move.
        failure_policy_move_keep_tags_suffix (string): suffix to add to the
            filename to keep tags when failure policy is move.
        step_limit (int): maximum step number (to avoid some loops).
    """
    stop_flag = False
    debug_mode_allowed = True
    unit_tests = False
    unit_tests_args = None
    failure_policy = None
    failure_policy_move_dest_dir = None
    failure_policy_move_keep_tags = True
    failure_policy_move_keep_tags_suffix = None
    step_limit = DEFAULT_STEP_LIMIT
    __last_ping = None
    _debug_mode = False
    def _init(self):
        super(AcquisitionStep, self)._init()
        self.failure_policy = self.args.failure_policy
        if self.failure_policy not in ("keep", "delete", "move"):
            self.error_and_die(
                "unknown failure policy: %s", self.failure_policy
            )
        if self.failure_policy == "move":
            fpmdd = self.args.failure_policy_move_dest_dir
            if fpmdd is None:
                self.error_and_die(
                    "you have to set a "
                    "failure-policy-move-dest-dir"
                    " in case of move failure policy"
                )
            mkdir_p_or_die(fpmdd)
            self.failure_policy_move_keep_tags = (
                self.args.failure_policy_move_keep_tags
            )
            self.failure_policy_move_keep_tags_suffix = (
                self.args.failure_policy_move_keep_tags_suffix
            )
        signal.signal(signal.SIGTERM, self.__sigterm_handler)
    def _add_extra_arguments_before(self, parser):
        parser.add_argument(
            "--redis-host",
            action="store",
            default="127.0.0.1",
            help="redis host to connect to (in daemon mode)",
        )
        parser.add_argument(
            "--redis-port",
            action="store",
            default=6379,
            help="redis port to connect to (in daemon mode)",
        )
        parser.add_argument(
            "--redis-unix-socket-path",
            action="store",
            default=None,
            help="path to redis unix socket (overrides "
            "redis-host and redis-port if set)",
        )
        parser.add_argument(
            "--failure-policy",
            action="store",
            default="keep",
            help="failure policy (keep, delete or move)",
        )
        parser.add_argument(
            "--failure-policy-move-dest-dir",
            action="store",
            default=None,
            help="dest-dir in case of move failure policy",
        )
        parser.add_argument(
            "--failure-policy-move-keep-tags",
            action="store",
            type=int,
            default=1,
            help="keep tags into another file in case of "
            "move failure policy ?",
        )
        parser.add_argument(
            "--failure-policy-move-keep-tags-suffix",
            action="store",
            default=".tags",
            help="suffix to add to the filename in case of "
            "move failure policy keep tags",
        )
    def _add_extra_arguments_after(self, parser):
        parser.add_argument(
            "FULL_FILEPATH_OR_QUEUE_NAME",
            action="store",
            help="if starts with /, we consider we are "
            "in debug mode, if not, we consider we are in "
            "daemon mode",
        )
    def _exception_safe_call(
        self, func, args, kwargs, label, return_value_if_exception
    ):
        try:
            return func(*args, **kwargs)
        except Exception:
            self.exception("exception during %s" % label)
            return return_value_if_exception
    def _process(self, xaf, force_original_filepath=None):
        if force_original_filepath is not None:
            xaf._original_filepath = force_original_filepath
        else:
            xaf._original_filepath = xaf.filepath
        self.info("Start the processing of %s...", xaf._original_filepath)
        timer = self.get_stats_client().timer("processing_file_timer")
        timer.start()
        before_status = self._before(xaf)
        if before_status is False:
            timer.stop(send=False)
            self.info(
                "End of the %s processing after %i ms "
                "(before_status=False)",
                xaf._original_filepath,
                timer.ms,
            )
            return False
        size = xaf.getsize()
        process_status = self._exception_safe_call(
            self.process,
            [xaf],
            {},
            "process of %s" % xaf._original_filepath,
            False,
        )
        after_status = self._after(xaf, process_status)
        self.get_stats_client().incr("number_of_processed_files", 1)
        self.get_stats_client().incr("bytes_of_processed_files", size)
        if not after_status:
            self.get_stats_client().incr("number_of_processing_errors", 1)
        timer.stop()
        self.info(
            "End of the %s processing after %i ms",
            xaf._original_filepath,
            timer.ms,
        )
        if not after_status:
            self.warning(
                "Bad processing status for file: %s", xaf._original_filepath
            )
            logger = self._get_logger()
            if logger.isEnabledFor(10):  # DEBUG
                xaf.dump_tags_on_logger(logger, 10)  # DEBUG
        return after_status
    def _before(self, xaf):
        tmp_filepath = self.get_tmp_filepath()
        self.info("Move %s to %s (to process it)", xaf.filepath, tmp_filepath)
        try:
            xaf.rename(tmp_filepath)
        except (IOError, OSError):
            self.debug(
                "Can't move %s to %s: file does not " "exist any more ?",
                xaf.filepath,
                tmp_filepath,
            )
            return False
        xaf._before_process_filepath = xaf.filepath
        self._set_before_tags(xaf)
        if self._get_counter_tag_value(xaf) > self.step_limit:
            self.warning(
                "step_value bigger than step_limit [%i] => "
                "deleting file to avoid a recursion loop ?" % self.step_limit
            )
            return False
        return True
    def _trash(self, xaf):
        if self.failure_policy == "delete":
            xaf.delete_or_nothing()
        elif self.failure_policy == "keep":
            new_filepath = os.path.join(
                _get_or_make_trash_dir(self.plugin_name, self.step_name),
                xaf.basename(),
            )
            (success, move) = xaf.move_or_copy(new_filepath)
            if success:
                xaf.write_tags_in_a_file(new_filepath + ".tags")
                xattrfile.XattrFile(new_filepath).clear_tags()
            else:
                xaf.delete_or_nothing()
        elif self.failure_policy == "move":
            new_filepath = os.path.join(
                self.args.failure_policy_move_dest_dir, xaf.basename()
            )
            (success, move) = xaf.move_or_copy(new_filepath)
            if success:
                if self.failure_policy_move_keep_tags:
                    suffix = self.failure_policy_move_keep_tags_suffix
                    xaf.write_tags_in_a_file(new_filepath + suffix)
                    xattrfile.XattrFile(new_filepath).clear_tags()
            else:
                xaf.delete_or_nothing()
    def _after(self, xaf, process_status):
        if process_status:
            if hasattr(xaf, "_before_process_filepath"):
                if xaf._before_process_filepath:
                    if xaf._before_process_filepath == xaf.filepath:
                        xaf.delete_or_nothing()
        else:
            self._trash(xaf)
        self._exception_safe_call(
            self.after, [process_status], {}, "after %s" % xaf.filepath, False
        )
        return process_status
    def _ping(self):
        self.__last_ping = datetime.datetime.utcnow()
        self._exception_safe_call(self.ping, [], {}, "ping", False)
    def __ping_if_needed(self):
        if self.stop_flag:
            return
        if self.__last_ping is None:
            return self._ping()
        delta = (datetime.datetime.utcnow() - self.__last_ping).total_seconds()
        if delta > 1:
            self._ping()
    def __run_in_daemon_mode(
        self, redis_host, redis_port, redis_queue, redis_unix_socket_path=None
    ):
        self.info("Start the daemon mode with redis_queue=%s", redis_queue)
        if redis_unix_socket_path:
            r = redis.StrictRedis(unix_socket_path=redis_unix_socket_path)
        else:
            r = redis.StrictRedis(host=redis_host, port=redis_port)
        redis_connection_exceptions_counter = 0
        while not self.stop_flag:
            self.__ping_if_needed()
            try:
                msg = r.brpop(redis_queue, 1)
            except KeyboardInterrupt:
                self.stop_flag = True
                continue
            except redis.exceptions.ConnectionError:
                redis_connection_exceptions_counter = (
                    redis_connection_exceptions_counter + 1
                )
                if redis_connection_exceptions_counter >= 10:
                    self.critical("Can't connect to redis after 10s => exit")
                    self.stop_flag = True
                    continue
                else:
                    self.debug(
                        "Can't connect to redis => "
                        "I will try again after 1s sleep"
                    )
                    time.sleep(1)
                    continue
            redis_connection_exceptions_counter = 0
            if msg is None:
                # brpop timeout
                continue
            try:
                decoded_msg = json.loads(msg[1].decode("utf8"))
                filepath = os.path.join(
                    decoded_msg["directory"], decoded_msg["filename"]
                )
            except Exception:
                self.warning(
                    "wrong message type popped on bus "
                    "=> stopping to force a restart"
                )
                self.stop_flag = True
                continue
            if not os.path.exists(filepath):
                self.debug(
                    "filepath: %s does not exist anymore " "=> ignoring event",
                    filepath,
                )
                continue
            xaf = xattrfile.XattrFile(filepath)
            counter = "counter.%s.%s" % (self.plugin_name, self.step_name)
            latest = "latest.%s.%s" % (self.plugin_name, self.step_name)
            try:
                r.incr(counter)
            except Exception:
                self.warning("can't update redis counter: %s" % counter)
            try:
                r.set(latest, str(get_utc_unix_timestamp()))
            except Exception:
                self.warning("can't set latest timestamp: %s" % latest)
            self._process(xaf)
        self.debug("Stop to  brpop queue %s" % redis_queue)
    def __run_in_debug_mode(self, filepath):
        self.info("Start the debug mode with filepath=%s", filepath)
        self._get_logger().setLevel(0)
        self._debug_mode = True
        self._ping()
        tmp_filepath = self.get_tmp_filepath()
        original_xaf = xattrfile.XattrFile(filepath)
        xaf = original_xaf.copy(tmp_filepath)
        return self._process(xaf, force_original_filepath=filepath)
    def get_stats_client(self, extra_tags={}):
        return get_stats_client(self.plugin_name, self.step_name, extra_tags)
[docs]    def get_plugin_directory_path(self):
        """Return the plugin directory (fullpath).
        Returns:
            (string) the fullpath of the plugin directory.
        """
        return os.path.join(
            MFMODULE_RUNTIME_HOME, "var", "plugins", self.plugin_name
        ) 
[docs]    def move_to_plugin_step(self, xaf, plugin_name, step_name,
                            keep_original_basename=False):
        """Move a XattrFile to another plugin/step.
        Args:
            xaf (XattrFile): XattrFile to move.
            plugin_name (string): plugin name.
            step_name (string): step name.
            keep_original_basename (boolean): if True, we keep the original
                basename of xaf.
        Returns:
            boolean: True if ok.
        """
        if keep_original_basename:
            basename = xaf.basename()
        else:
            basename = get_unique_hexa_identifier()
        target_path = os.path.join(
            get_plugin_step_directory_path(plugin_name, step_name),
            basename,
        )
        result, _ = xaf.move_or_copy(target_path)
        return result 
[docs]    def copy_to_plugin_step(self, xaf, plugin_name, step_name,
                            keep_original_basename=False):
        """Copy a XattrFile (with tags) to another plugin/step.
        Args:
            xaf (XattrFile): XattrFile to move.
            plugin_name (string): plugin name.
            step_name (string): step name.
            keep_original_basename (boolean): if True, we keep the original
                basename of xaf.
        Returns:
            boolean: True if ok
        """
        if keep_original_basename:
            basename = xaf.basename()
        else:
            basename = get_unique_hexa_identifier()
        target_path = os.path.join(
            get_plugin_step_directory_path(plugin_name, step_name),
            basename,
        )
        result = xaf.copy_or_nothing(target_path)
        return result 
[docs]    def process(self, xaf):
        """Process one file.
        Process one XattrFile. You have to override this method (unless your
        class inherits from BatchStep, see batch_process() in that case).
        The file is moved into a temporary directory before the call with
        an unique filename. Extended attributes are copied to it.
        So you can do what you want with it.
        If the method returns True:
        - we considerer that the processing is ok
        - the file is delete if necessary
        If the method doesn't return True:
        - we considerer that the processing is not ok (a warning message is
            logged).
        - the file is given to the choosen failure policy.
        Args:
            xaf : XattrFile object.
        Returns:
            boolean: processing status (True: ok, False: not ok)
        """
        raise NotImplementedError("process() method must be overriden") 
[docs]    def after(self, status):
        """Method called after the process execution.
        It's called event in case of exceptions during process.
        """
        return 
[docs]    def ping(self):
        """Do something every second if possible.
        The call can be blocked by a long running process() call.
        """
        return 
    def run(self):
        self._init()
        if self.args.FULL_FILEPATH_OR_QUEUE_NAME.startswith("/"):
            if not self.debug_mode_allowed:
                self.error_and_die("debug mode is not allowed for this step")
            self.__run_in_debug_mode(self.args.FULL_FILEPATH_OR_QUEUE_NAME)
        else:
            self.__run_in_daemon_mode(
                self.args.redis_host,
                self.args.redis_port,
                self.args.FULL_FILEPATH_OR_QUEUE_NAME,
                self.args.redis_unix_socket_path,
            )
        self._destroy()
[docs]    def get_original_basename(self, xaf):
        """Return the original basename of the file.
        The original basename is the not modified basename before step0
        execution. "unknown" is returned if we can't find the basename.
        Args:
            xaf (XattrFile): file to consider.
        Returns:
            string: original basename.
        """
        tag_name = self._get_original_basename_tag_name()
        return xaf.tags.get(tag_name, b"unknown").decode("utf8") 
[docs]    def get_original_uid(self, xaf):
        """Return the original uid of the file.
        The original uid is the first unique id given before step0 execution.
        "unknown" is returned if we can't find the original uid.
        Args:
            xaf (XattrFile): file to consider.
        Returns:
            string: original uid.
        """
        tag_name = self._get_original_uid_tag_name()
        return xaf.tags.get(tag_name, b"unknown").decode("utf8") 
[docs]    def get_original_dirname(self, xaf):
        """Return the original dirname of the file.
        The original dirname is the not modified basename before step0
        execution. "unknown" is returned if we can't find the dirname.
        Args:
            xaf (XattrFile): file to consider.
        Returns:
            string: original dirname.
        """
        tag_name = self._get_original_dirname_tag_name()
        return xaf.tags.get(tag_name, b"unknown").decode("utf8") 
    def __sigterm_handler(self, *args):
        self.debug("SIGTERM signal handled => schedulling shutdown")
        self.stop_flag = True