from __future__ import print_function
import os
import xattrfile
import redis
import json
import sys
import configargparse
import datetime
import time
import re
import six
import signal
from functools import partial
import mflog
from mfutil import mkdir_p_or_die, get_unique_hexa_identifier
from mfutil import get_utc_unix_timestamp
from mfutil.plugins import MFUtilPluginBaseNotInitialized
from mfutil.plugins import get_installed_plugins
from acquisition.utils import _set_custom_environment, \
get_plugin_step_directory_path, MODULE_RUNTIME_HOME, _get_tmp_filepath, \
_make_config_file_parser_class, _get_or_make_trash_dir
from acquisition.stats import get_stats_client
DEFAULT_STEP_LIMIT = 1000
DEFAULT_REDIS_HOST = "127.0.0.1"
DEFAULT_REDIS_PORT = int(os.environ.get("MFDATA_REDIS_PORT", "1234"))
DEBUG_PLUGIN_NAME = "debug"
try:
DEBUG_PLUGIN_INSTALLED = \
DEBUG_PLUGIN_NAME in [x['name'] for x in get_installed_plugins()]
except MFUtilPluginBaseNotInitialized:
DEBUG_PLUGIN_INSTALLED = False
[docs]class AcquisitionStep(object):
"""Abstract class to describe an acquisition step.
You have to override this class.
Attributes:
debug_mode_allowed (boolean): if True, the debug mode is allowed.
stop_flag (boolean): if True, stop the daemon as soon as possible.
unit_tests (boolean): if True, we are in unit tests mode.
failure_policy (string): failure policy.
failure_policy_move_dest_dir (string): destination directory when
failure policy is move
failure_policy_move_keep_tags (boolean): keep tags into another file
when failure policy is move ?
failure_policy_move_keep_tags_suffix (string): suffix to add to the
filename to keep tags when failure policy is move
args (Namespace): argparser Namespace object with parsed cli args.
__logger (Logger): Logger object.
__last_ping (Datetime): Datetime object of the last ping() call.
"""
stop_flag = False
debug_mode_allowed = True
unit_tests = False
unit_tests_args = None
failure_policy = None
failure_policy_move_dest_dir = None
failure_policy_move_keep_tags = True
failure_policy_move_keep_tags_suffix = None
step_limit = DEFAULT_STEP_LIMIT
args = None
__logger = None
__last_ping = None
_shadow = False
_debug_mode = False
def __init__(self):
"""Constructor."""
step_name = self.step_name
plugin_name = self.plugin_name
regexp = "^[A-Za-z0-9_]+$"
if not re.match(regexp, step_name):
self.error_and_die("step_name: %s must match with %s",
step_name, regexp)
if not re.match(regexp, plugin_name):
self.error_and_die("plugin_name: %s must match with %s",
plugin_name, regexp)
_set_custom_environment(plugin_name, step_name)
def _init(self):
parser = self.__get_argument_parser()
if self.unit_tests and self.unit_tests_args:
self.args = parser.parse_args(self.unit_tests_args)
else:
self.args = parser.parse_args()
self.failure_policy = self.args.failure_policy
if self.failure_policy not in ('keep', 'delete', 'move'):
self.error_and_die("unknown failure policy: %s",
self.failure_policy)
if self.failure_policy == 'move':
fpmdd = self.args.failure_policy_move_dest_dir
if fpmdd is None:
self.error_and_die('you have to set a '
'failure-policy-move-dest-dir'
' in case of move failure policy')
mkdir_p_or_die(fpmdd)
self.failure_policy_move_keep_tags = \
self.args.failure_policy_move_keep_tags
self.failure_policy_move_keep_tags_suffix = \
self.args.failure_policy_move_keep_tags_suffix
signal.signal(signal.SIGTERM, self.__sigterm_handler)
self.init()
def __sigterm_handler(self, *args):
self.debug("SIGTERM signal handled => schedulling shutdown")
self.stop_flag = True
def __get_logger(self):
if not self.__logger:
logger_name = "mfdata.%s.%s" % (self.plugin_name, self.step_name)
self.__logger = mflog.getLogger(logger_name)
return self.__logger
[docs] def get_tmp_filepath(self):
"""Get a full temporary filepath (including unique filename).
Returns:
(string) full temporary filepath (including unique filename).
"""
return _get_tmp_filepath(self.plugin_name, self.step_name)
def _exception_safe_call(self, func, args, kwargs, label,
return_value_if_exception):
try:
return func(*args, **kwargs)
except Exception:
self.exception("exception during %s" % label)
return return_value_if_exception
def _process(self, xaf):
xaf._original_filepath = xaf.filepath
self.info("Start the processing of %s...", xaf._original_filepath)
timer = self.get_stats_client().timer("processing_file_timer")
timer.start()
before_status = self._before(xaf)
if before_status is False:
timer.stop(send=False)
self.info("End of the %s processing after %i ms "
"(before_status=False)", xaf._original_filepath,
timer.ms)
return False
size = xaf.getsize()
process_status = \
self._exception_safe_call(self.process, [xaf], {},
"process of %s" % xaf._original_filepath,
False)
after_status = self._after(xaf, process_status)
self.get_stats_client().incr("number_of_processed_files", 1)
self.get_stats_client().incr("bytes_of_processed_files", size)
if not after_status:
self.get_stats_client().incr("number_of_processing_errors", 1)
timer.stop()
self.info("End of the %s processing after %i ms",
xaf._original_filepath,
timer.ms)
if not after_status:
self.warning("Bad processing status for file: %s",
xaf._original_filepath)
logger = self.__get_logger()
if logger.isEnabledFor(10): # DEBUG
xaf.dump_tags_on_logger(logger, 10) # DEBUG
return after_status
def _conditional_copy_to_debug_plugin(self, xaf):
if DEBUG_PLUGIN_INSTALLED:
if not self._shadow:
self.copy_to_plugin_step(xaf, DEBUG_PLUGIN_NAME, "main")
def _before(self, xaf):
tmp_filepath = self.get_tmp_filepath()
self.debug("Move %s to %s", xaf.filepath, tmp_filepath)
try:
xaf.rename(tmp_filepath)
except (IOError, OSError):
self.debug("Can't move %s to %s: file does not "
"exist any more ?", xaf.filepath,
tmp_filepath)
return False
xaf._before_process_filepath = xaf.filepath
self._set_before_tags(xaf)
self._conditional_copy_to_debug_plugin(xaf)
if self._get_counter_tag_value(xaf) > self.step_limit:
self.warning("step_value bigger than step_limit [%i] => "
"deleting file to avoid a recursion loop ?" %
self.step_limit)
return False
return True
def _set_after_tags(self, xaf, process_status):
if not self._shadow:
self.set_tag(xaf, "exit_step",
self._current_utc_datetime_with_ms(),
add_latest=False)
if process_status:
self.set_tag(xaf, "process_status", "ok", add_latest=False)
else:
self.set_tag(xaf, "process_status", "nok", add_latest=False)
def _set_before_tags(self, xaf):
if not self._shadow:
current = self._current_utc_datetime_with_ms()
self.__increment_and_set_counter_tag_value(xaf)
self.set_tag(xaf, "enter_step", current, add_latest=False)
self.__set_original_basename_if_necessary(xaf)
self.__set_original_uid_if_necessary(xaf)
self.__set_original_dirname_if_necessary(xaf)
def _current_utc_datetime_with_ms(self):
return datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S:%f')
def _trash(self, xaf):
if self.failure_policy == "delete":
xaf.delete_or_nothing()
elif self.failure_policy == "keep":
new_filepath = \
os.path.join(_get_or_make_trash_dir(self.plugin_name,
self.step_name),
xaf.basename())
(success, move) = xaf.move_or_copy(new_filepath)
if success:
xaf.write_tags_in_a_file(new_filepath + ".tags")
xattrfile.XattrFile(new_filepath).clear_tags()
else:
xaf.delete_or_nothing()
elif self.failure_policy == "move":
new_filepath = os.path.join(self.args.failure_policy_move_dest_dir,
xaf.basename())
(success, move) = xaf.move_or_copy(new_filepath)
if success:
if self.failure_policy_move_keep_tags:
suffix = self.failure_policy_move_keep_tags_suffix
xaf.write_tags_in_a_file(new_filepath + suffix)
xattrfile.XattrFile(new_filepath).clear_tags()
else:
xaf.delete_or_nothing()
def _after(self, xaf, process_status):
if process_status:
if hasattr(xaf, "_before_process_filepath"):
if xaf._before_process_filepath:
if xaf._before_process_filepath == xaf.filepath:
xaf.delete_or_nothing()
else:
self._set_after_tags(xaf, process_status)
self._trash(xaf)
self._exception_safe_call(self.after, [process_status], {},
"after %s" % xaf.filepath, False)
return process_status
def _ping(self):
self.__last_ping = datetime.datetime.utcnow()
self._exception_safe_call(self.ping, [], {}, "ping", False)
def __ping_if_needed(self):
if self.stop_flag:
return
if self.__last_ping is None:
return self._ping()
delta = (datetime.datetime.utcnow() - self.__last_ping).total_seconds()
if delta > 1:
self._ping()
def __run_in_daemon_mode(self, redis_host, redis_port, redis_queue,
redis_unix_socket_path=None):
self.info("Start the daemon mode with redis_queue=%s", redis_queue)
if redis_unix_socket_path:
r = redis.StrictRedis(unix_socket_path=redis_unix_socket_path)
else:
r = redis.StrictRedis(host=redis_host, port=redis_port)
redis_connection_exceptions_counter = 0
while not self.stop_flag:
self.__ping_if_needed()
try:
msg = r.brpop(redis_queue, 1)
except KeyboardInterrupt:
self.stop_flag = True
continue
except redis.exceptions.ConnectionError:
redis_connection_exceptions_counter = \
redis_connection_exceptions_counter + 1
if redis_connection_exceptions_counter >= 10:
self.critical("Can't connect to redis after 10s => exit")
self.stop_flag = True
continue
else:
self.debug("Can't connect to redis => "
"I will try again after 1s sleep")
time.sleep(1)
continue
redis_connection_exceptions_counter = 0
if msg is None:
# brpop timeout
continue
try:
decoded_msg = json.loads(msg[1].decode('utf8'))
filepath = os.path.join(decoded_msg['directory'],
decoded_msg['filename'])
except Exception:
self.warning("wrong message type popped on bus "
"=> stopping to force a restart")
self.stop_flag = True
continue
if not os.path.exists(filepath):
self.debug("filepath: %s does not exist anymore "
"=> ignoring event",
filepath)
continue
xaf = xattrfile.XattrFile(filepath)
counter = "counter.%s.%s" % (self.plugin_name, self.step_name)
latest = "latest.%s.%s" % (self.plugin_name, self.step_name)
try:
r.incr(counter)
except Exception:
self.warning("can't update redis counter: %s" % counter)
try:
r.set(latest, str(get_utc_unix_timestamp()))
except Exception:
self.warning("can't set latest timestamp: %s" % latest)
self._process(xaf)
self.debug("Stop to brpop queue %s" % redis_queue)
def __run_in_debug_mode(self, filepath):
self.info("Start the debug mode with filepath=%s", filepath)
self.__get_logger().setLevel(0)
self._debug_mode = True
self._ping()
tmp_filepath = self.get_tmp_filepath()
original_xaf = xattrfile.XattrFile(filepath)
xaf = original_xaf.copy(tmp_filepath)
return self._process(xaf)
def __get_argument_parser(self):
"""Make and return an ArgumentParser object.
If you want to add some extra options, you have to override
the add_extra_arguments() method.
Returns:
an ArgumentParser object with all options added.
"""
description = "%s/%s acquisition step" % (self.plugin_name,
self.step_name)
parser = configargparse.ArgumentParser(
description=description,
add_env_var_help=False,
ignore_unknown_config_file_keys=True,
args_for_setting_config_path=["-c", "--config-file"],
config_file_parser_class=partial(_make_config_file_parser_class,
self.plugin_name,
self.step_name)
)
parser.add_argument('--redis-host', action='store',
default='127.0.0.1',
help='redis host to connect to (in daemon mode)')
parser.add_argument('--redis-port', action='store', default=6379,
help='redis port to connect to (in daemon mode)')
parser.add_argument('--redis-unix-socket-path', action='store',
default=None,
help='path to redis unix socket (overrides '
'redis-host and redis-port if set)')
parser.add_argument('--failure-policy', action='store', default="keep",
help='failure policy (keep, delete or move)')
parser.add_argument('--failure-policy-move-dest-dir', action='store',
default=None,
help='dest-dir in case of move failure policy')
parser.add_argument('--failure-policy-move-keep-tags', action='store',
type=int, default=1,
help='keep tags into another file in case of '
'move failure policy ?')
parser.add_argument('--failure-policy-move-keep-tags-suffix',
action='store', default=".tags",
help='suffix to add to the filename in case of '
'move failure policy keep tags')
self.add_extra_arguments(parser)
parser.add_argument('FULL_FILEPATH_OR_QUEUE_NAME',
action='store',
help='if starts with /, we consider we are '
'in debug mode, if not, we consider we are in '
'daemon mode')
return parser
[docs] def get_stats_client(self, extra_tags={}):
return get_stats_client(self.plugin_name, self.step_name,
extra_tags)
@property
def step_name(self):
"""Get the name of the step.
This method is called if there is no "step_name" property defined.
This said property SHOULD be defined.
The name must match with `^[A-Za-z0-9_]+$` regexp.
Returns:
(string) the name of the step.
"""
return "main"
@property
def plugin_name(self):
"""Return the name of the plugin.
This method is called if there is no "step_name" property defined.
This said property MUST be defined.
The name must match with `^[A-Za-z0-9_]+$` regexp.
Returns:
(string) the name of the plugin.
"""
raise NotImplementedError("The plugin_name property is not defined."
" Please define a plugin_name property.")
[docs] def get_plugin_directory_path(self):
"""Return the plugin directory (fullpath).
Returns:
(string) the fullpath of the plugin directory.
"""
return os.path.join(MODULE_RUNTIME_HOME,
'var', 'plugins', self.plugin_name)
[docs] def move_to_plugin_step(self, xaf, plugin_name, step_name):
"""Move a XattrFile to another plugin/step.
Args:
xaf (XattrFile): XattrFile to move
plugin_name (string): plugin name
step_name (string): step name
Returns:
boolean: True if ok
"""
target_path = os.path.join(get_plugin_step_directory_path(plugin_name,
step_name),
get_unique_hexa_identifier())
self._set_after_tags(xaf, True)
result, _ = xaf.move_or_copy(target_path)
return result
[docs] def copy_to_plugin_step(self, xaf, plugin_name, step_name):
"""Copy a XattrFile (with tags) to another plugin/step.
Args:
xaf (XattrFile): XattrFile to move
plugin_name (string): plugin name
step_name (string): step name
Returns:
boolean: True if ok
"""
target_path = os.path.join(get_plugin_step_directory_path(plugin_name,
step_name),
get_unique_hexa_identifier())
self._set_after_tags(xaf, True)
result = xaf.copy_or_nothing(target_path)
return result
[docs] def process(self, xaf):
"""Process one file.
Process one XattrFile. You have to override this method (unless your
class inherits from BatchStep, see batch_process() in that case).
The file is moved into a temporary directory before the call with
an unique filename. Extended attributes are copied to it.
So you can do what you want with it.
If the method returns True:
- we considerer that the processing is ok
- the file is delete if necessary
If the method doesn't return True:
- we considerer that the processing is not ok (a warning message is
logged).
- the file is given to the choosen failure policy.
Args:
xaf : XattrFile object.
Returns:
boolean: processing status (True: ok, False: not ok)
"""
raise NotImplementedError("process() method must be overriden")
[docs] def after(self, status):
"""Method called after the process execution.
It's called event in case of exceptions during process.
"""
return
[docs] def ping(self):
"""Do something every second if possible.
The call can be blocked by a long running process() call.
"""
return
[docs] def init(self):
"""Method called after CLI parsing but before processing any files."""
pass
def _destroy(self):
self.destroy()
[docs] def destroy(self):
"""Destroy what you want just before exiting.
No file will be processed after calling this method.
"""
pass
[docs] def run(self):
self._init()
if self.args.FULL_FILEPATH_OR_QUEUE_NAME.startswith('/'):
if not self.debug_mode_allowed:
self.error_and_die("debug mode is not allowed for this step")
self.__run_in_debug_mode(self.args.FULL_FILEPATH_OR_QUEUE_NAME)
else:
self.__run_in_daemon_mode(self.args.redis_host,
self.args.redis_port,
self.args.FULL_FILEPATH_OR_QUEUE_NAME,
self.args.redis_unix_socket_path)
self._destroy()
[docs] def warning(self, msg, *args, **kwargs):
"""Log a warning message."""
logger = self.__get_logger()
logger.warning(msg, *args, **kwargs)
[docs] def debug(self, msg, *args, **kwargs):
"""Log a debug message."""
logger = self.__get_logger()
logger.debug(msg, *args, **kwargs)
[docs] def info(self, msg, *args, **kwargs):
"""Log an info message."""
logger = self.__get_logger()
logger.info(msg, *args, **kwargs)
[docs] def critical(self, msg, *args, **kwargs):
"""Log a critical message."""
logger = self.__get_logger()
logger.critical(msg, *args, **kwargs)
[docs] def error(self, msg, *args, **kwargs):
"""Log an error message."""
logger = self.__get_logger()
logger.error(msg, *args, **kwargs)
[docs] def error_and_die(self, msg, *args, **kwargs):
"""Log an error message and exit immediatly."""
self.error(msg, *args, **kwargs)
sys.stderr.write("exiting...\n")
os._exit(2)
[docs] def exception(self, msg, *args, **kwargs):
"""Log an error message with current exception stacktrace.
This method should only be called from an exception handler.
"""
logger = self.__get_logger()
logger.exception(str(msg), *args, **kwargs)
def __get_tag_name(self, name, counter_str_value='latest',
force_step_name=None, force_plugin_name=None):
plugin_name = self.plugin_name
step_name = self.step_name
if force_step_name is not None:
step_name = force_step_name
if force_plugin_name is not None:
plugin_name = force_plugin_name
if plugin_name == "core":
return "%s.%s.%s" % (counter_str_value, plugin_name, name)
else:
return "%s.%s.%s.%s" % (counter_str_value, plugin_name,
step_name, name)
def _set_tag_latest(self, xaf, name, value):
tag_name = self.__get_tag_name(name, 'latest')
self.__set_tag(xaf, tag_name, value)
[docs] def set_tag(self, xaf, name, value, add_latest=True):
"""Set a tag on a file with good prefixes.
Args:
xaf (XattrFile): file to add/set tag on.
name (string): name of the tag (without prefixes)
value (string): value of the tag
add_latest (boolean): add latest prefix
"""
counter_str_value = str(self._get_counter_tag_value(xaf))
tag_name = self.__get_tag_name(name, counter_str_value)
self.__set_tag(xaf, tag_name, value)
if add_latest:
self._set_tag_latest(xaf, name, value)
[docs] def get_tag(self, xaf, name, not_found_value=None,
counter_str_value='latest', force_step_name=None,
force_plugin_name=None):
"""Read a tag on a file with good prefixes.
Args:
xaf (XattrFile): file to read.
name (string): name of the tag (without prefixes).
not_found_value: returned value if the tag is not found.
counter_str_value (string): counter string value.
force_step_name: tagger step name (if None, current
step name is taken)
force_plugin_name: tagger plugin name (if None, current
plugin name is taken)
"""
tag_name = self.__get_tag_name(name, counter_str_value,
force_step_name, force_plugin_name)
return xaf.tags.get(tag_name, not_found_value)
def _get_counter_tag_value(self, xaf, not_found_value='0'):
tag_name = self.__get_tag_name("step_counter",
force_plugin_name="core")
return int(xaf.tags.get(tag_name, not_found_value))
def __increment_and_set_counter_tag_value(self, xaf):
tag_name = self.__get_tag_name("step_counter",
force_plugin_name="core")
counter_value = self._get_counter_tag_value(xaf, not_found_value='-1')
value = six.b("%i" % (counter_value + 1))
self.__set_tag(xaf, tag_name, value)
def __set_tag(self, xaf, name, value):
self.debug("Setting tag %s = %s" % (name, value))
xaf.tags[name] = value
def __get_original_basename_tag_name(self):
return self.__get_tag_name("original_basename",
force_plugin_name="core",
counter_str_value="first")
def __get_original_uid_tag_name(self):
return self.__get_tag_name("original_uid",
force_plugin_name="core",
counter_str_value="first")
def __get_original_dirname_tag_name(self):
return self.__get_tag_name("original_dirname",
force_plugin_name="core",
counter_str_value="first")
def __set_original_basename_if_necessary(self, xaf):
if hasattr(xaf, "_original_filepath") and xaf._original_filepath:
tag_name = self.__get_original_basename_tag_name()
if tag_name not in xaf.tags:
original_basename = \
str(os.path.basename(xaf._original_filepath))
self.__set_tag(xaf, tag_name, original_basename)
def __set_original_uid_if_necessary(self, xaf):
tag_name = self.__get_original_uid_tag_name()
if tag_name not in xaf.tags:
original_uid = get_unique_hexa_identifier()
self.__set_tag(xaf, tag_name, original_uid)
def __set_original_dirname_if_necessary(self, xaf):
if hasattr(xaf, "_original_filepath") and xaf._original_filepath:
tag_name = self.__get_original_dirname_tag_name()
if tag_name not in xaf.tags:
dirname = os.path.dirname(xaf._original_filepath)
original_dirname = \
str(os.path.basename(dirname))
self.__set_tag(xaf, tag_name, original_dirname)
[docs] def get_original_basename(self, xaf):
"""Return the original basename of the file.
The original basename is the not modified basename before step0
execution. "unknown" is returned if we can't find the basename.
Args:
xaf (XattrFile): file to consider.
Returns:
string: original basename.
"""
tag_name = self.__get_original_basename_tag_name()
return xaf.tags.get(tag_name, b"unknown").decode("utf8")
[docs] def get_original_uid(self, xaf):
"""Return the original uid of the file.
The original uid is the first unique id given before step0 execution.
"unknown" is returned if we can't find the original uid.
Args:
xaf (XattrFile): file to consider.
Returns:
string: original uid.
"""
tag_name = self.__get_original_uid_tag_name()
return xaf.tags.get(tag_name, b"unknown").decode("utf8")
[docs] def get_original_dirname(self, xaf):
"""Return the original dirname of the file.
The original dirname is the not modified basename before step0
execution. "unknown" is returned if we can't find the dirname.
Args:
xaf (XattrFile): file to consider.
Returns:
string: original dirname.
"""
tag_name = self.__get_original_dirname_tag_name()
return xaf.tags.get(tag_name, b"unknown").decode("utf8")