Coverage for mfutil/misc.py: 58%
142 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-11-13 15:33 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-11-13 15:33 +0000
1"""Generic utility classes and functions."""
3import uuid
4import errno
5import os
6import logging
7import tempfile
8import socket
9import psutil
10import pickle
11import hashlib
12from datetime import datetime, timezone
13import time
14import fnmatch
16from inotify_simple import flags
17from mfutil.exc import MFUtilException
18from mfutil.eval import SandboxedEval
20__pdoc__ = {
21 "add_inotify_watch": False
22}
25def __get_logger():
26 return logging.getLogger("mfutil")
29def eval(expr, variables=None):
30 """Evaluate (safely) a python expression (as a string).
32 The eval is done with simpleeval library.
34 Following functions are available (in expressions):
36 - re_match: see match() function of re module
37 - re_imatch: insensitive match() function of re module
38 - fnmatch.fnmatch: fnmatch() function of fnmatch module
40 Args:
41 expr (string): the python expression to eval.
42 variables (dict): if set, inject some variables/values
43 in the expression.
45 """
47 s = SandboxedEval(names=variables)
48 return s.eval(expr)
51def get_unique_hexa_identifier():
52 """Return an unique hexa identifier on 32 bytes.
54 The idenfier is made only with 0123456789abcdef
55 characters.
57 Returns:
58 (string) unique hexa identifier.
60 """
61 return str(uuid.uuid4()).replace('-', '')
64def get_utc_unix_timestamp():
65 """Return the current unix UTC timestamp on all platforms.
67 It works even if the machine is configured in local time.
69 Returns:
70 (int) a int corresponding to the current unix utc timestamp.
72 """
74 # utcnow() is deprecated and should be replaced by now(datetime.UTC)
75 # (for python >= 3.11)
76 try:
77 dts = datetime.now(timezone.utc).replace(tzinfo=None)
78 except Exception:
79 dts = datetime.utcnow()
81 return int(time.mktime(dts.timetuple()))
84def mkdir_p(path, nodebug=False, nowarning=False):
85 """Make a directory recursively (clone of mkdir -p).
87 Thanks to http://stackoverflow.com/questions/600268/
88 mkdir-p-functionality-in-python .
90 Any exceptions are catched and a warning message
91 is logged in case of problems.
93 If the directory already exists, True is returned
94 with no debug or warning.
96 Args:
97 path (string): complete path to create.
98 nodebug (boolean): if True, no debug messages are logged.
99 nowarning (boolean): if True, no message are logged in
100 case of problems.
102 Returns:
103 boolean: True if the directory exists at the end.
105 """
106 try:
107 os.makedirs(path)
108 except OSError as exc:
109 if exc.errno == errno.EEXIST and os.path.isdir(path):
110 return True
111 else:
112 if not nowarning:
113 __get_logger().warning("can't create %s directory", path)
114 return False
115 # We do not log debug message because it is logged in circus
116 # configuration file
117 # if not nodebug:
118 # __get_logger().debug("%s directory created", path)
119 return True
122def mkdir_p_or_die(path, nodebug=False, exit_code=2):
123 """Make a directory recursively (clone of mkdir -p).
125 If the directory already exists, True is returned
126 with no debug or warning.
128 Any exceptions are catched.
130 In case of problems, the program dies here with corresponding
131 exit_code.
133 Args:
134 path (string): complete path to create.
135 nodebug (boolean): if True, no debug messages are logged.
136 exit_code (int): os._exit() exit code.
138 """
139 res = mkdir_p(path, nodebug=nodebug, nowarning=True)
140 if not res:
141 __get_logger().error("can't create %s directory", path)
142 os._exit(exit_code)
145def _get_temp_dir(tmp_dir=None):
146 """Return system temp dir or used choosen temp dir.
148 If the user provides a tmp_dir argument, the
149 directory is created (if necessary).
151 If the user don't provide a tmp_dir argument,
152 the function returns a system temp dir.
154 If the directory is not good or can be created,
155 an exception is raised.
157 Args:
158 tmp_dir (string): user provided tmp directory (None
159 to use the system temp dir).
161 Returns:
162 (string) temp directory
164 Raises:
165 MFUtilException if the temp directory is not good or can't
166 be created.
168 """
169 if tmp_dir is None:
170 tmp_dir = tempfile.gettempdir()
171 res = mkdir_p(tmp_dir)
172 if not res:
173 raise MFUtilException("can't create temp_dir: %s", tmp_dir)
174 return tmp_dir
177def get_tmp_filepath(tmp_dir=None, prefix=""):
178 """Return a tmp (complete) filepath.
180 The filename is made with get_unique_hexa_identifier() identifier
181 so 32 hexa characters.
183 The dirname can be provided by the user (or be a system one).
184 He will be created if necessary. An exception can be raised if any
185 problems at this side.
187 Note: the file is not created or open at all. The function just
188 returns a filename.
190 Args:
191 tmp_dir (string): user provided tmp directory (None
192 to use the system temp dir).
193 prefix (string): you can add here a prefix for filenames
194 (will be preprended before the 32 hexa characters).
196 Returns:
197 (string) tmp (complete) filepath.
199 Raises:
200 MFUtilException if the temp directory is not good or can't
201 be created.
203 """
204 temp_dir = _get_temp_dir(tmp_dir)
205 return os.path.join(temp_dir, prefix + get_unique_hexa_identifier())
208def create_tmp_dirpath(tmp_dir=None, prefix=""):
209 """Create and return a temporary directory inside a father
210 tempory directory.
212 The dirname is made with get_unique_hexa_identifier() identifier
213 so 32 hexa characters.
215 The father dirname can be provided by the user (or be a system one).
216 He will be created if necessary. An exception can be raised if any
217 problems at this side.
219 Note: the temporary directory is created.
221 Args:
222 tmp_dir (string): user provided tmp directory (None
223 to use the system temp dir).
224 prefix (string): you can add here a prefix for dirnames
225 (will be preprended before the 32 hexa characters).
227 Returns:
228 (string) complete path of a newly created temporary directory.
230 Raises:
231 MFUtilException if the temp directory can't be created.
233 """
234 temp_dir = _get_temp_dir(tmp_dir)
235 new_temp_dir = os.path.join(temp_dir,
236 prefix + get_unique_hexa_identifier())
237 res = mkdir_p(new_temp_dir, nowarning=True)
238 if not res:
239 raise MFUtilException("can't create temp_dir: %s", new_temp_dir)
240 return new_temp_dir
243def get_ipv4_for_hostname(hostname, static_mappings={}):
244 """Translate a host name to IPv4 address format.
246 The IPv4 address is returned as a string, such as '100.50.200.5'.
247 If the host name is an IPv4 address itself it is returned unchanged.
249 You can provide a dictionnary with static mappings.
250 Following mappings are added by default:
251 '127.0.0.1' => '127.0.0.1'
252 'localhost' => '127.0.0.1'
253 'localhost.localdomain' => '127.0.0.1'
255 Args:
256 hostname (string): hostname.
257 static_mappings (dict): dictionnary of static mappings
258 ((hostname) string: (ip) string).
260 Returns:
261 (string) IPv4 address for the given hostname (None if any problem)
263 """
264 hostname = hostname.lower()
265 static_mappings.update({'127.0.0.1': '127.0.0.1', 'localhost': '127.0.0.1',
266 'localhost.localdomain': '127.0.0.1'})
267 if hostname in static_mappings:
268 return static_mappings[hostname]
269 try:
270 return socket.gethostbyname(hostname)
271 except Exception:
272 return None
275def get_recursive_mtime(directory, ignores=[]):
276 """Get the latest mtime recursivly on a directory.
278 Args:
279 directory (string): complete path of a directory to scan.
280 ignores (list of strings): list of shell-style wildcards
281 to define which filenames/dirnames to ignores (see fnmatch).
283 Returns:
284 (int) timestamp of the latest mtime on the directory.
286 """
287 result = 0
288 for name in os.listdir(directory):
289 ignored = False
290 for ssw in ignores:
291 if fnmatch.fnmatch(name, ssw):
292 ignored = True
293 break
294 if ignored:
295 continue
296 fullpath = os.path.join(directory, name)
297 if os.path.isdir(fullpath):
298 mtime = get_recursive_mtime(fullpath, ignores=ignores)
299 else:
300 mtime = 0
301 try:
302 mtime = int(os.path.getmtime(fullpath))
303 except Exception:
304 pass
305 if mtime > result:
306 result = mtime
307 return result
310def add_inotify_watch(inotify, directory, ignores=[]):
311 """Register recursively directories to watch.
313 Args:
314 inotify (inotify object): object that owns the file descriptors
315 directory (string): complete path of a directory to scan.
316 ignores (list of strings): list of shell-style wildcards
317 to define which filenames/dirnames to ignores (see fnmatch).
319 """
320 watch_flags = flags.MODIFY | flags.CREATE |\
321 flags.DELETE | flags.DELETE_SELF
322 try:
323 __get_logger().info("watch %s" % directory)
324 inotify.add_watch(directory, watch_flags)
325 except Exception as e:
326 __get_logger().warning("cannot watch %s: %s" % (directory, e))
328 if not os.access(directory, os.R_OK):
329 __get_logger().warning("cannot enter into %s" % directory)
330 return
332 for name in os.listdir(directory):
333 ignored = False
334 for ssw in ignores:
335 if fnmatch.fnmatch(name, ssw):
336 ignored = True
337 break
338 if ignored:
339 continue
340 fullpath = os.path.join(directory, name)
341 if os.path.isdir(fullpath):
342 add_inotify_watch(inotify, fullpath, ignores=ignores)
345def _kill_process_and_children(process):
346 children = None
347 all_children = None
348 # First we keep the full view of the process tree to kill
349 try:
350 all_children = process.children(recursive=True)
351 except psutil.NoSuchProcess:
352 pass
353 # Then, we keep just immediate children to kill in the "good" order
354 try:
355 children = process.children(recursive=False)
356 except psutil.NoSuchProcess:
357 pass
358 try:
359 process.kill()
360 except psutil.NoSuchProcess:
361 pass
362 if children is not None:
363 for child in children:
364 _kill_process_and_children(child)
365 # To be sure, we didn't miss something, we kill the initial full list
366 if all_children is not None:
367 for child in all_children:
368 _kill_process_and_children(child)
371def kill_process_and_children(pid):
372 """Kill recursively a complete tree of processes.
374 Given a pid, this method recursively kills the complete tree (children and
375 children of each child...) of this process.
377 The SIGKILL signal is used.
379 Args:
380 pid (int): process PID to kill.
382 """
383 try:
384 process = psutil.Process(pid)
385 except psutil.NoSuchProcess:
386 return
387 _kill_process_and_children(process)
390def hash_generator(*args):
391 """Generate a hash from a variable number of arguments as a safe string.
393 Note that pickle is used so arguments have to be serializable.
395 Args:
396 *args: arguments to hash
398 """
399 temp = pickle.dumps(args, pickle.HIGHEST_PROTOCOL)
400 return hashlib.md5(temp).hexdigest()