"""Generic utility classes and functions."""
import uuid
import errno
import os
import logging
import tempfile
import socket
import bash
import psutil
import pickle
import hashlib
import datetime
import time
import fnmatch
# FIXME: deprecated
from mfutil.deprecated import get_bash_output_or_die # noqa: F401
from mfutil.deprecated import get_bash_output_or_warning # noqa: F401
from inotify_simple import flags
[docs]class MFUtilException(Exception):
"""Just a custom exception object dedicated for mfutil package."""
def __get_logger():
return logging.getLogger("mfutil")
[docs]def get_unique_hexa_identifier():
"""Return an unique hexa identifier on 32 bytes.
The idenfier is made only with 0123456789abcdef
characters.
Returns:
(string) unique hexa identifier.
"""
return str(uuid.uuid4()).replace('-', '')
[docs]def get_utc_unix_timestamp():
"""Return the current unix UTC timestamp on all platforms.
It works even if the machine is configured in local time.
Returns:
(int) a int corresponding to the current unix utc timestamp.
"""
dts = datetime.datetime.utcnow()
return int(time.mktime(dts.timetuple()))
[docs]def mkdir_p(path, nodebug=False, nowarning=False):
"""Make a directory recursively (clone of mkdir -p).
Thanks to http://stackoverflow.com/questions/600268/
mkdir-p-functionality-in-python .
Any exceptions are catched and a warning message
is logged in case of problems.
If the directory already exists, True is returned
with no debug or warning.
Args:
path (string): complete path to create.
nodebug (boolean): if True, no debug messages are logged.
nowarning (boolean): if True, no message are logged in
case of problems.
Returns:
boolean: True if the directory exists at the end.
"""
try:
os.makedirs(path)
except OSError as exc:
if exc.errno == errno.EEXIST and os.path.isdir(path):
return True
else:
if not nowarning:
__get_logger().warning("can't create %s directory", path)
return False
if not nodebug:
__get_logger().debug("%s directory created", path)
return True
[docs]def mkdir_p_or_die(path, nodebug=False, exit_code=2):
"""Make a directory recursively (clone of mkdir -p).
If the directory already exists, True is returned
with no debug or warning.
Any exceptions are catched.
In case of problems, the program dies here with corresponding
exit_code.
Args:
path (string): complete path to create.
nodebug (boolean): if True, no debug messages are logged.
exit_code (int): os._exit() exit code.
"""
res = mkdir_p(path, nodebug=nodebug, nowarning=True)
if not res:
__get_logger().error("can't create %s directory", path)
os._exit(exit_code)
def _get_temp_dir(tmp_dir=None):
"""Return system temp dir or used choosen temp dir.
If the user provides a tmp_dir argument, the
directory is created (if necessary).
If the user don't provide a tmp_dir argument,
the function returns a system temp dir.
If the directory is not good or can be created,
an exception is raised.
Args:
tmp_dir (string): user provided tmp directory (None
to use the system temp dir).
Returns:
(string) temp directory
Raises:
MFUtilException if the temp directory is not good or can't
be created.
"""
if tmp_dir is None:
tmp_dir = tempfile.gettempdir()
res = mkdir_p(tmp_dir)
if not res:
raise MFUtilException("can't create temp_dir: %s", tmp_dir)
return tmp_dir
[docs]def get_tmp_filepath(tmp_dir=None, prefix=""):
"""Return a tmp (complete) filepath.
The filename is made with get_unique_hexa_identifier() identifier
so 32 hexa characters.
The dirname can be provided by the user (or be a system one).
He will be created if necessary. An exception can be raised if any
problems at this side.
Note: the file is not created or open at all. The function just
returns a filename.
Args:
tmp_dir (string): user provided tmp directory (None
to use the system temp dir).
prefix (string): you can add here a prefix for filenames
(will be preprended before the 32 hexa characters).
Returns:
(string) tmp (complete) filepath.
Raises:
MFUtilException if the temp directory is not good or can't
be created.
"""
temp_dir = _get_temp_dir(tmp_dir)
return os.path.join(temp_dir, prefix + get_unique_hexa_identifier())
[docs]def create_tmp_dirpath(tmp_dir=None, prefix=""):
"""Create and return a temporary directory inside a father tempory directory.
The dirname is made with get_unique_hexa_identifier() identifier
so 32 hexa characters.
The father dirname can be provided by the user (or be a system one).
He will be created if necessary. An exception can be raised if any
problems at this side.
Note: the temporary directory is created.
Args:
tmp_dir (string): user provided tmp directory (None
to use the system temp dir).
prefix (string): you can add here a prefix for dirnames
(will be preprended before the 32 hexa characters).
Returns:
(string) complete path of a newly created temporary directory.
Raises:
MFUtilException if the temp directory can't be created.
"""
temp_dir = _get_temp_dir(tmp_dir)
new_temp_dir = os.path.join(temp_dir,
prefix + get_unique_hexa_identifier())
res = mkdir_p(new_temp_dir, nowarning=True)
if not res:
raise MFUtilException("can't create temp_dir: %s", new_temp_dir)
return new_temp_dir
[docs]def get_ipv4_for_hostname(hostname, static_mappings={}):
"""Translate a host name to IPv4 address format.
The IPv4 address is returned as a string, such as '100.50.200.5'.
If the host name is an IPv4 address itself it is returned unchanged.
You can provide a dictionnary with static mappings.
Following mappings are added by default:
'127.0.0.1' => '127.0.0.1'
'localhost' => '127.0.0.1'
'localhost.localdomain' => '127.0.0.1'
Args:
hostname (string): hostname.
static_mappings (dict): dictionnary of static mappings
((hostname) string: (ip) string).
Returns:
(string) IPv4 address for the given hostname (None if any problem)
"""
hostname = hostname.lower()
static_mappings.update({'127.0.0.1': '127.0.0.1', 'localhost': '127.0.0.1',
'localhost.localdomain': '127.0.0.1'})
if hostname in static_mappings:
return static_mappings[hostname]
try:
return socket.gethostbyname(hostname)
except Exception:
return None
[docs]def get_recursive_mtime(directory, ignores=[]):
"""Get the latest mtime recursivly on a directory.
Args:
directory (string): complete path of a directory to scan.
ignores (list of strings): list of shell-style wildcards
to define which filenames/dirnames to ignores (see fnmatch).
Returns:
(int) timestamp of the latest mtime on the directory.
"""
result = 0
for name in os.listdir(directory):
ignored = False
for ssw in ignores:
if fnmatch.fnmatch(name, ssw):
ignored = True
break
if ignored:
continue
fullpath = os.path.join(directory, name)
if os.path.isdir(fullpath):
mtime = get_recursive_mtime(fullpath, ignores=ignores)
else:
mtime = 0
try:
mtime = int(os.path.getmtime(fullpath))
except Exception:
pass
if mtime > result:
result = mtime
return result
[docs]def add_inotify_watch(inotify, directory, ignores=[]):
"""Register recursively directories to watch.
Args:
inotify (inotify object): object that owns the file descriptors
directory (string): complete path of a directory to scan.
ignores (list of strings): list of shell-style wildcards
to define which filenames/dirnames to ignores (see fnmatch).
"""
watch_flags = flags.MODIFY | flags.CREATE |\
flags.DELETE | flags.DELETE_SELF
try:
__get_logger().info("watch %s" % directory)
inotify.add_watch(directory, watch_flags)
except Exception as e:
__get_logger().warning("cannot watch %s: %s" % (directory, e))
if not os.access(directory, os.R_OK):
__get_logger().warning("cannot enter into %s" % directory)
return
for name in os.listdir(directory):
ignored = False
for ssw in ignores:
if fnmatch.fnmatch(name, ssw):
ignored = True
break
if ignored:
continue
fullpath = os.path.join(directory, name)
if os.path.isdir(fullpath):
add_inotify_watch(inotify, fullpath, ignores=ignores)
[docs]class BashWrapperException(Exception):
"""Specific exception class for BashWrapper objects."""
__message = None
__bash_wrapper = None
def __init__(self, message, bash_wrapper=None):
"""Constructor.
Args:
message (string): exception message
bash_wrapper (BashWrapper): bash wrapper object
"""
super(BashWrapperException, self).__init__(message)
self.__message = message
self.__bash_wrapper = bash_wrapper
def __repr__(self):
if self.__bash_wrapper is not None:
return "BashWrapperException with message: %s and debug: %s" % \
(self.__message, self.__bash_wrapper)
else:
return "BashWrapperException with message: %s" % self.__message
def __str__(self):
return self.__repr__()
[docs]class BashWrapper(object):
"""Bash command/output wrapper."""
__bash_cmd = None
__bash_result_object = None
def __init__(self, bash_cmd):
"""Constructor.
The constructor executes the given bash command and store result code
and stdout/stderr inside the object.
You can use this object like this::
bash_wrapper = BashWrapper("ls /tmp")
if bash_wrapper:
print("execution was ok (status_code == 0)")
else:
print("execution was not ok (status_code != 0)")
status_code = bash_wrapper.code
stdout_output = bash_wrapper.stdout
stderr_output = bash_wrapper.stderr
print("full representation with command/code/stdout/stderr: %s" %
bash_wapper)
Args:
bash_cmd (string): complete bash command to execute.
"""
self.__bash_cmd = bash_cmd
self.__bash_result_object = bash.bash(bash_cmd)
def __bool__(self):
return (self.__bash_result_object.code == 0)
def __nonzero__(self):
# python2 compatibility
return self.__bool__()
@property
def code(self):
"""Return the status code of the command as int (0 => ok)."""
return self.__bash_result_object.code
@property
def stdout(self):
"""Return the stdout output of the command stripped and utf8 decoded.
Returns:
(string) stdout output (stripped and utf8 decoded).
"""
return self.__bash_result_object.stdout.strip().decode('UTF-8')
@property
def stderr(self):
"""Return the stderr output of the command stripped and utf8 decoded.
Returns:
(string) stderr output (stripped and utf8 decoded).
"""
return self.__bash_result_object.stderr.strip().decode('UTF-8')
def __repr__(self):
result = []
result.append("")
result.append("===== BASH COMMAND =======================")
result.append(self.__bash_cmd)
result.append("===== BASH RETURN CODE ===================")
result.append(str(self.code))
stdout = self.stdout
if stdout and len(stdout) > 0:
result.append("===== BASH STDOUT ========================")
result.append(self.stdout)
stderr = self.stderr
if stderr and len(stderr) > 0:
result.append("===== BASH STDERR ========================")
result.append(self.stderr)
return "\n".join(result)
[docs]class BashWrapperOrRaise(BashWrapper):
"""BashWrapper subclass which raise an exception if status_code != 0."""
def __init__(self, bash_cmd, exception_class=BashWrapperException,
exception_msg="bad return code"):
"""Constructor.
The constructor executes the given bash command and store result code
and stdout/stderr inside the object.
If the status_code is != 0, an exception is raised with all
informations (stdout/stderr/code) inside.
If the status_code is 0, you can use this object like a BashWrapper
one.
Example::
try:
x = BashWrapperOrRaise("ls /foo/bar")
except BashWrapperException as e:
print("exception with all details: %s" % e)
else:
# here, we have x.code == 0
print("stdout: %s" % x.stdout)
Args:
bash_cmd (string): complete bash command to execute.
exception_class (BashWrapperException): exception class to raise
in case of status_code !=0 (must be a subclass of
BashWrapperException).
exception_msg (string): exception message in case of
status_code != 0.
"""
super(BashWrapperOrRaise, self).__init__(bash_cmd)
if self.code != 0:
raise exception_class(exception_msg, self)
def _kill_process_and_children(process):
children = None
try:
children = process.children(recursive=False)
except psutil.NoSuchProcess:
pass
try:
process.kill()
except psutil.NoSuchProcess:
pass
if children is not None:
for child in children:
_kill_process_and_children(child)
[docs]def kill_process_and_children(pid):
"""Kill recursively a complete tree of processes.
Given a pid, this method recursively kills the complete tree (children and
children of each child...) of this process.
The SIGKILL signal is used.
Args:
pid (int): process PID to kill.
"""
try:
process = psutil.Process(pid)
except psutil.NoSuchProcess:
return
_kill_process_and_children(process)
[docs]def hash_generator(*args):
"""Generate a hash from a variable number of arguments as a safe string.
Note that pickle is used so arguments have to be serializable.
Args:
*args: arguments to hash
"""
temp = pickle.dumps(args, pickle.HIGHEST_PROTOCOL)
return hashlib.md5(temp).hexdigest()