Module mfutil.misc
Generic utility classes and functions.
Expand source code
"""Generic utility classes and functions."""
import uuid
import errno
import os
import logging
import tempfile
import socket
import psutil
import pickle
import hashlib
import datetime
import time
import fnmatch
from inotify_simple import flags
from mfutil.exc import MFUtilException
from mfutil.eval import SandboxedEval
__pdoc__ = {
"add_inotify_watch": False
}
def __get_logger():
return logging.getLogger("mfutil")
def eval(expr, variables=None):
"""Evaluate (safely) a python expression (as a string).
The eval is done with simpleeval library.
Following functions are available (in expressions):
- re_match: see match() function of re module
- re_imatch: insensitive match() function of re module
- fnmatch.fnmatch: fnmatch() function of fnmatch module
Args:
expr (string): the python expression to eval.
variables (dict): if set, inject some variables/values
in the expression.
"""
s = SandboxedEval(names=variables)
return s.eval(expr)
def get_unique_hexa_identifier():
"""Return an unique hexa identifier on 32 bytes.
The idenfier is made only with 0123456789abcdef
characters.
Returns:
(string) unique hexa identifier.
"""
return str(uuid.uuid4()).replace('-', '')
def get_utc_unix_timestamp():
"""Return the current unix UTC timestamp on all platforms.
It works even if the machine is configured in local time.
Returns:
(int) a int corresponding to the current unix utc timestamp.
"""
dts = datetime.datetime.utcnow()
return int(time.mktime(dts.timetuple()))
def mkdir_p(path, nodebug=False, nowarning=False):
"""Make a directory recursively (clone of mkdir -p).
Thanks to http://stackoverflow.com/questions/600268/
mkdir-p-functionality-in-python .
Any exceptions are catched and a warning message
is logged in case of problems.
If the directory already exists, True is returned
with no debug or warning.
Args:
path (string): complete path to create.
nodebug (boolean): if True, no debug messages are logged.
nowarning (boolean): if True, no message are logged in
case of problems.
Returns:
boolean: True if the directory exists at the end.
"""
try:
os.makedirs(path)
except OSError as exc:
if exc.errno == errno.EEXIST and os.path.isdir(path):
return True
else:
if not nowarning:
__get_logger().warning("can't create %s directory", path)
return False
#We do not log debug message because it is logged in circus configuration file
#if not nodebug:
# __get_logger().debug("%s directory created", path)
return True
def mkdir_p_or_die(path, nodebug=False, exit_code=2):
"""Make a directory recursively (clone of mkdir -p).
If the directory already exists, True is returned
with no debug or warning.
Any exceptions are catched.
In case of problems, the program dies here with corresponding
exit_code.
Args:
path (string): complete path to create.
nodebug (boolean): if True, no debug messages are logged.
exit_code (int): os._exit() exit code.
"""
res = mkdir_p(path, nodebug=nodebug, nowarning=True)
if not res:
__get_logger().error("can't create %s directory", path)
os._exit(exit_code)
def _get_temp_dir(tmp_dir=None):
"""Return system temp dir or used choosen temp dir.
If the user provides a tmp_dir argument, the
directory is created (if necessary).
If the user don't provide a tmp_dir argument,
the function returns a system temp dir.
If the directory is not good or can be created,
an exception is raised.
Args:
tmp_dir (string): user provided tmp directory (None
to use the system temp dir).
Returns:
(string) temp directory
Raises:
MFUtilException if the temp directory is not good or can't
be created.
"""
if tmp_dir is None:
tmp_dir = tempfile.gettempdir()
res = mkdir_p(tmp_dir)
if not res:
raise MFUtilException("can't create temp_dir: %s", tmp_dir)
return tmp_dir
def get_tmp_filepath(tmp_dir=None, prefix=""):
"""Return a tmp (complete) filepath.
The filename is made with get_unique_hexa_identifier() identifier
so 32 hexa characters.
The dirname can be provided by the user (or be a system one).
He will be created if necessary. An exception can be raised if any
problems at this side.
Note: the file is not created or open at all. The function just
returns a filename.
Args:
tmp_dir (string): user provided tmp directory (None
to use the system temp dir).
prefix (string): you can add here a prefix for filenames
(will be preprended before the 32 hexa characters).
Returns:
(string) tmp (complete) filepath.
Raises:
MFUtilException if the temp directory is not good or can't
be created.
"""
temp_dir = _get_temp_dir(tmp_dir)
return os.path.join(temp_dir, prefix + get_unique_hexa_identifier())
def create_tmp_dirpath(tmp_dir=None, prefix=""):
"""Create and return a temporary directory inside a father tempory directory.
The dirname is made with get_unique_hexa_identifier() identifier
so 32 hexa characters.
The father dirname can be provided by the user (or be a system one).
He will be created if necessary. An exception can be raised if any
problems at this side.
Note: the temporary directory is created.
Args:
tmp_dir (string): user provided tmp directory (None
to use the system temp dir).
prefix (string): you can add here a prefix for dirnames
(will be preprended before the 32 hexa characters).
Returns:
(string) complete path of a newly created temporary directory.
Raises:
MFUtilException if the temp directory can't be created.
"""
temp_dir = _get_temp_dir(tmp_dir)
new_temp_dir = os.path.join(temp_dir,
prefix + get_unique_hexa_identifier())
res = mkdir_p(new_temp_dir, nowarning=True)
if not res:
raise MFUtilException("can't create temp_dir: %s", new_temp_dir)
return new_temp_dir
def get_ipv4_for_hostname(hostname, static_mappings={}):
"""Translate a host name to IPv4 address format.
The IPv4 address is returned as a string, such as '100.50.200.5'.
If the host name is an IPv4 address itself it is returned unchanged.
You can provide a dictionnary with static mappings.
Following mappings are added by default:
'127.0.0.1' => '127.0.0.1'
'localhost' => '127.0.0.1'
'localhost.localdomain' => '127.0.0.1'
Args:
hostname (string): hostname.
static_mappings (dict): dictionnary of static mappings
((hostname) string: (ip) string).
Returns:
(string) IPv4 address for the given hostname (None if any problem)
"""
hostname = hostname.lower()
static_mappings.update({'127.0.0.1': '127.0.0.1', 'localhost': '127.0.0.1',
'localhost.localdomain': '127.0.0.1'})
if hostname in static_mappings:
return static_mappings[hostname]
try:
return socket.gethostbyname(hostname)
except Exception:
return None
def get_recursive_mtime(directory, ignores=[]):
"""Get the latest mtime recursivly on a directory.
Args:
directory (string): complete path of a directory to scan.
ignores (list of strings): list of shell-style wildcards
to define which filenames/dirnames to ignores (see fnmatch).
Returns:
(int) timestamp of the latest mtime on the directory.
"""
result = 0
for name in os.listdir(directory):
ignored = False
for ssw in ignores:
if fnmatch.fnmatch(name, ssw):
ignored = True
break
if ignored:
continue
fullpath = os.path.join(directory, name)
if os.path.isdir(fullpath):
mtime = get_recursive_mtime(fullpath, ignores=ignores)
else:
mtime = 0
try:
mtime = int(os.path.getmtime(fullpath))
except Exception:
pass
if mtime > result:
result = mtime
return result
def add_inotify_watch(inotify, directory, ignores=[]):
"""Register recursively directories to watch.
Args:
inotify (inotify object): object that owns the file descriptors
directory (string): complete path of a directory to scan.
ignores (list of strings): list of shell-style wildcards
to define which filenames/dirnames to ignores (see fnmatch).
"""
watch_flags = flags.MODIFY | flags.CREATE |\
flags.DELETE | flags.DELETE_SELF
try:
__get_logger().info("watch %s" % directory)
inotify.add_watch(directory, watch_flags)
except Exception as e:
__get_logger().warning("cannot watch %s: %s" % (directory, e))
if not os.access(directory, os.R_OK):
__get_logger().warning("cannot enter into %s" % directory)
return
for name in os.listdir(directory):
ignored = False
for ssw in ignores:
if fnmatch.fnmatch(name, ssw):
ignored = True
break
if ignored:
continue
fullpath = os.path.join(directory, name)
if os.path.isdir(fullpath):
add_inotify_watch(inotify, fullpath, ignores=ignores)
def _kill_process_and_children(process):
children = None
all_children = None
# First we keep the full view of the process tree to kill
try:
all_children = process.children(recursive=True)
except psutil.NoSuchProcess:
pass
# Then, we keep just immediate children to kill in the "good" order
try:
children = process.children(recursive=False)
except psutil.NoSuchProcess:
pass
try:
process.kill()
except psutil.NoSuchProcess:
pass
if children is not None:
for child in children:
_kill_process_and_children(child)
# To be sure, we didn't miss something, we kill the initial full list
if all_children is not None:
for child in all_children:
_kill_process_and_children(child)
def kill_process_and_children(pid):
"""Kill recursively a complete tree of processes.
Given a pid, this method recursively kills the complete tree (children and
children of each child...) of this process.
The SIGKILL signal is used.
Args:
pid (int): process PID to kill.
"""
try:
process = psutil.Process(pid)
except psutil.NoSuchProcess:
return
_kill_process_and_children(process)
def hash_generator(*args):
"""Generate a hash from a variable number of arguments as a safe string.
Note that pickle is used so arguments have to be serializable.
Args:
*args: arguments to hash
"""
temp = pickle.dumps(args, pickle.HIGHEST_PROTOCOL)
return hashlib.md5(temp).hexdigest()
Functions
def create_tmp_dirpath(tmp_dir=None, prefix='')
-
Create and return a temporary directory inside a father tempory directory.
The dirname is made with get_unique_hexa_identifier() identifier so 32 hexa characters.
The father dirname can be provided by the user (or be a system one). He will be created if necessary. An exception can be raised if any problems at this side.
Note: the temporary directory is created.
Args
tmp_dir
:string
- user provided tmp directory (None to use the system temp dir).
prefix
:string
- you can add here a prefix for dirnames (will be preprended before the 32 hexa characters).
Returns
(string) complete path of a newly created temporary directory.
Raises
MFUtilException if the temp directory can't be created.
Expand source code
def create_tmp_dirpath(tmp_dir=None, prefix=""): """Create and return a temporary directory inside a father tempory directory. The dirname is made with get_unique_hexa_identifier() identifier so 32 hexa characters. The father dirname can be provided by the user (or be a system one). He will be created if necessary. An exception can be raised if any problems at this side. Note: the temporary directory is created. Args: tmp_dir (string): user provided tmp directory (None to use the system temp dir). prefix (string): you can add here a prefix for dirnames (will be preprended before the 32 hexa characters). Returns: (string) complete path of a newly created temporary directory. Raises: MFUtilException if the temp directory can't be created. """ temp_dir = _get_temp_dir(tmp_dir) new_temp_dir = os.path.join(temp_dir, prefix + get_unique_hexa_identifier()) res = mkdir_p(new_temp_dir, nowarning=True) if not res: raise MFUtilException("can't create temp_dir: %s", new_temp_dir) return new_temp_dir
def eval(expr, variables=None)
-
Evaluate (safely) a python expression (as a string).
The eval is done with simpleeval library.
Following functions are available (in expressions):
- re_match: see match() function of re module
- re_imatch: insensitive match() function of re module
- fnmatch.fnmatch: fnmatch() function of fnmatch module
Args
expr
:string
- the python expression to eval.
variables
:dict
- if set, inject some variables/values in the expression.
Expand source code
def eval(expr, variables=None): """Evaluate (safely) a python expression (as a string). The eval is done with simpleeval library. Following functions are available (in expressions): - re_match: see match() function of re module - re_imatch: insensitive match() function of re module - fnmatch.fnmatch: fnmatch() function of fnmatch module Args: expr (string): the python expression to eval. variables (dict): if set, inject some variables/values in the expression. """ s = SandboxedEval(names=variables) return s.eval(expr)
def get_ipv4_for_hostname(hostname, static_mappings={})
-
Translate a host name to IPv4 address format.
The IPv4 address is returned as a string, such as '100.50.200.5'. If the host name is an IPv4 address itself it is returned unchanged.
You can provide a dictionnary with static mappings. Following mappings are added by default: '127.0.0.1' => '127.0.0.1' 'localhost' => '127.0.0.1' 'localhost.localdomain' => '127.0.0.1'
Args
hostname
:string
- hostname.
static_mappings
:dict
- dictionnary of static mappings ((hostname) string: (ip) string).
Returns
(string) IPv4 address for the given hostname (None if any problem)
Expand source code
def get_ipv4_for_hostname(hostname, static_mappings={}): """Translate a host name to IPv4 address format. The IPv4 address is returned as a string, such as '100.50.200.5'. If the host name is an IPv4 address itself it is returned unchanged. You can provide a dictionnary with static mappings. Following mappings are added by default: '127.0.0.1' => '127.0.0.1' 'localhost' => '127.0.0.1' 'localhost.localdomain' => '127.0.0.1' Args: hostname (string): hostname. static_mappings (dict): dictionnary of static mappings ((hostname) string: (ip) string). Returns: (string) IPv4 address for the given hostname (None if any problem) """ hostname = hostname.lower() static_mappings.update({'127.0.0.1': '127.0.0.1', 'localhost': '127.0.0.1', 'localhost.localdomain': '127.0.0.1'}) if hostname in static_mappings: return static_mappings[hostname] try: return socket.gethostbyname(hostname) except Exception: return None
def get_recursive_mtime(directory, ignores=[])
-
Get the latest mtime recursivly on a directory.
Args
directory
:string
- complete path of a directory to scan.
ignores
:list
ofstrings
- list of shell-style wildcards to define which filenames/dirnames to ignores (see fnmatch).
Returns
(int) timestamp of the latest mtime on the directory.
Expand source code
def get_recursive_mtime(directory, ignores=[]): """Get the latest mtime recursivly on a directory. Args: directory (string): complete path of a directory to scan. ignores (list of strings): list of shell-style wildcards to define which filenames/dirnames to ignores (see fnmatch). Returns: (int) timestamp of the latest mtime on the directory. """ result = 0 for name in os.listdir(directory): ignored = False for ssw in ignores: if fnmatch.fnmatch(name, ssw): ignored = True break if ignored: continue fullpath = os.path.join(directory, name) if os.path.isdir(fullpath): mtime = get_recursive_mtime(fullpath, ignores=ignores) else: mtime = 0 try: mtime = int(os.path.getmtime(fullpath)) except Exception: pass if mtime > result: result = mtime return result
def get_tmp_filepath(tmp_dir=None, prefix='')
-
Return a tmp (complete) filepath.
The filename is made with get_unique_hexa_identifier() identifier so 32 hexa characters.
The dirname can be provided by the user (or be a system one). He will be created if necessary. An exception can be raised if any problems at this side.
Note: the file is not created or open at all. The function just returns a filename.
Args
tmp_dir
:string
- user provided tmp directory (None to use the system temp dir).
prefix
:string
- you can add here a prefix for filenames (will be preprended before the 32 hexa characters).
Returns
(string) tmp (complete) filepath.
Raises
MFUtilException if the temp directory is not good or can't be created.
Expand source code
def get_tmp_filepath(tmp_dir=None, prefix=""): """Return a tmp (complete) filepath. The filename is made with get_unique_hexa_identifier() identifier so 32 hexa characters. The dirname can be provided by the user (or be a system one). He will be created if necessary. An exception can be raised if any problems at this side. Note: the file is not created or open at all. The function just returns a filename. Args: tmp_dir (string): user provided tmp directory (None to use the system temp dir). prefix (string): you can add here a prefix for filenames (will be preprended before the 32 hexa characters). Returns: (string) tmp (complete) filepath. Raises: MFUtilException if the temp directory is not good or can't be created. """ temp_dir = _get_temp_dir(tmp_dir) return os.path.join(temp_dir, prefix + get_unique_hexa_identifier())
def get_unique_hexa_identifier()
-
Return an unique hexa identifier on 32 bytes.
The idenfier is made only with 0123456789abcdef characters.
Returns
(string) unique hexa identifier.
Expand source code
def get_unique_hexa_identifier(): """Return an unique hexa identifier on 32 bytes. The idenfier is made only with 0123456789abcdef characters. Returns: (string) unique hexa identifier. """ return str(uuid.uuid4()).replace('-', '')
def get_utc_unix_timestamp()
-
Return the current unix UTC timestamp on all platforms.
It works even if the machine is configured in local time.
Returns
(int) a int corresponding to the current unix utc timestamp.
Expand source code
def get_utc_unix_timestamp(): """Return the current unix UTC timestamp on all platforms. It works even if the machine is configured in local time. Returns: (int) a int corresponding to the current unix utc timestamp. """ dts = datetime.datetime.utcnow() return int(time.mktime(dts.timetuple()))
def hash_generator(*args)
-
Generate a hash from a variable number of arguments as a safe string.
Note that pickle is used so arguments have to be serializable.
Args
*args
- arguments to hash
Expand source code
def hash_generator(*args): """Generate a hash from a variable number of arguments as a safe string. Note that pickle is used so arguments have to be serializable. Args: *args: arguments to hash """ temp = pickle.dumps(args, pickle.HIGHEST_PROTOCOL) return hashlib.md5(temp).hexdigest()
def kill_process_and_children(pid)
-
Kill recursively a complete tree of processes.
Given a pid, this method recursively kills the complete tree (children and children of each child…) of this process.
The SIGKILL signal is used.
Args
pid
:int
- process PID to kill.
Expand source code
def kill_process_and_children(pid): """Kill recursively a complete tree of processes. Given a pid, this method recursively kills the complete tree (children and children of each child...) of this process. The SIGKILL signal is used. Args: pid (int): process PID to kill. """ try: process = psutil.Process(pid) except psutil.NoSuchProcess: return _kill_process_and_children(process)
def mkdir_p(path, nodebug=False, nowarning=False)
-
Make a directory recursively (clone of mkdir -p).
Thanks to http://stackoverflow.com/questions/600268/ mkdir-p-functionality-in-python .
Any exceptions are catched and a warning message is logged in case of problems.
If the directory already exists, True is returned with no debug or warning.
Args
path
:string
- complete path to create.
nodebug
:boolean
- if True, no debug messages are logged.
nowarning
:boolean
- if True, no message are logged in case of problems.
Returns
boolean
- True if the directory exists at the end.
Expand source code
def mkdir_p(path, nodebug=False, nowarning=False): """Make a directory recursively (clone of mkdir -p). Thanks to http://stackoverflow.com/questions/600268/ mkdir-p-functionality-in-python . Any exceptions are catched and a warning message is logged in case of problems. If the directory already exists, True is returned with no debug or warning. Args: path (string): complete path to create. nodebug (boolean): if True, no debug messages are logged. nowarning (boolean): if True, no message are logged in case of problems. Returns: boolean: True if the directory exists at the end. """ try: os.makedirs(path) except OSError as exc: if exc.errno == errno.EEXIST and os.path.isdir(path): return True else: if not nowarning: __get_logger().warning("can't create %s directory", path) return False #We do not log debug message because it is logged in circus configuration file #if not nodebug: # __get_logger().debug("%s directory created", path) return True
def mkdir_p_or_die(path, nodebug=False, exit_code=2)
-
Make a directory recursively (clone of mkdir -p).
If the directory already exists, True is returned with no debug or warning.
Any exceptions are catched.
In case of problems, the program dies here with corresponding exit_code.
Args
path
:string
- complete path to create.
nodebug
:boolean
- if True, no debug messages are logged.
exit_code
:int
- os._exit() exit code.
Expand source code
def mkdir_p_or_die(path, nodebug=False, exit_code=2): """Make a directory recursively (clone of mkdir -p). If the directory already exists, True is returned with no debug or warning. Any exceptions are catched. In case of problems, the program dies here with corresponding exit_code. Args: path (string): complete path to create. nodebug (boolean): if True, no debug messages are logged. exit_code (int): os._exit() exit code. """ res = mkdir_p(path, nodebug=nodebug, nowarning=True) if not res: __get_logger().error("can't create %s directory", path) os._exit(exit_code)