Coverage for mfutil/misc.py: 58%

142 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-11-13 15:33 +0000

1"""Generic utility classes and functions.""" 

2 

3import uuid 

4import errno 

5import os 

6import logging 

7import tempfile 

8import socket 

9import psutil 

10import pickle 

11import hashlib 

12from datetime import datetime, timezone 

13import time 

14import fnmatch 

15 

16from inotify_simple import flags 

17from mfutil.exc import MFUtilException 

18from mfutil.eval import SandboxedEval 

19 

20__pdoc__ = { 

21 "add_inotify_watch": False 

22} 

23 

24 

25def __get_logger(): 

26 return logging.getLogger("mfutil") 

27 

28 

29def eval(expr, variables=None): 

30 """Evaluate (safely) a python expression (as a string). 

31 

32 The eval is done with simpleeval library. 

33 

34 Following functions are available (in expressions): 

35 

36 - re_match: see match() function of re module 

37 - re_imatch: insensitive match() function of re module 

38 - fnmatch.fnmatch: fnmatch() function of fnmatch module 

39 

40 Args: 

41 expr (string): the python expression to eval. 

42 variables (dict): if set, inject some variables/values 

43 in the expression. 

44 

45 """ 

46 

47 s = SandboxedEval(names=variables) 

48 return s.eval(expr) 

49 

50 

51def get_unique_hexa_identifier(): 

52 """Return an unique hexa identifier on 32 bytes. 

53 

54 The idenfier is made only with 0123456789abcdef 

55 characters. 

56 

57 Returns: 

58 (string) unique hexa identifier. 

59 

60 """ 

61 return str(uuid.uuid4()).replace('-', '') 

62 

63 

64def get_utc_unix_timestamp(): 

65 """Return the current unix UTC timestamp on all platforms. 

66 

67 It works even if the machine is configured in local time. 

68 

69 Returns: 

70 (int) a int corresponding to the current unix utc timestamp. 

71 

72 """ 

73 

74 # utcnow() is deprecated and should be replaced by now(datetime.UTC) 

75 # (for python >= 3.11) 

76 try: 

77 dts = datetime.now(timezone.utc).replace(tzinfo=None) 

78 except Exception: 

79 dts = datetime.utcnow() 

80 

81 return int(time.mktime(dts.timetuple())) 

82 

83 

84def mkdir_p(path, nodebug=False, nowarning=False): 

85 """Make a directory recursively (clone of mkdir -p). 

86 

87 Thanks to http://stackoverflow.com/questions/600268/ 

88 mkdir-p-functionality-in-python . 

89 

90 Any exceptions are catched and a warning message 

91 is logged in case of problems. 

92 

93 If the directory already exists, True is returned 

94 with no debug or warning. 

95 

96 Args: 

97 path (string): complete path to create. 

98 nodebug (boolean): if True, no debug messages are logged. 

99 nowarning (boolean): if True, no message are logged in 

100 case of problems. 

101 

102 Returns: 

103 boolean: True if the directory exists at the end. 

104 

105 """ 

106 try: 

107 os.makedirs(path) 

108 except OSError as exc: 

109 if exc.errno == errno.EEXIST and os.path.isdir(path): 

110 return True 

111 else: 

112 if not nowarning: 

113 __get_logger().warning("can't create %s directory", path) 

114 return False 

115 # We do not log debug message because it is logged in circus 

116 # configuration file 

117 # if not nodebug: 

118 # __get_logger().debug("%s directory created", path) 

119 return True 

120 

121 

122def mkdir_p_or_die(path, nodebug=False, exit_code=2): 

123 """Make a directory recursively (clone of mkdir -p). 

124 

125 If the directory already exists, True is returned 

126 with no debug or warning. 

127 

128 Any exceptions are catched. 

129 

130 In case of problems, the program dies here with corresponding 

131 exit_code. 

132 

133 Args: 

134 path (string): complete path to create. 

135 nodebug (boolean): if True, no debug messages are logged. 

136 exit_code (int): os._exit() exit code. 

137 

138 """ 

139 res = mkdir_p(path, nodebug=nodebug, nowarning=True) 

140 if not res: 

141 __get_logger().error("can't create %s directory", path) 

142 os._exit(exit_code) 

143 

144 

145def _get_temp_dir(tmp_dir=None): 

146 """Return system temp dir or used choosen temp dir. 

147 

148 If the user provides a tmp_dir argument, the 

149 directory is created (if necessary). 

150 

151 If the user don't provide a tmp_dir argument, 

152 the function returns a system temp dir. 

153 

154 If the directory is not good or can be created, 

155 an exception is raised. 

156 

157 Args: 

158 tmp_dir (string): user provided tmp directory (None 

159 to use the system temp dir). 

160 

161 Returns: 

162 (string) temp directory 

163 

164 Raises: 

165 MFUtilException if the temp directory is not good or can't 

166 be created. 

167 

168 """ 

169 if tmp_dir is None: 

170 tmp_dir = tempfile.gettempdir() 

171 res = mkdir_p(tmp_dir) 

172 if not res: 

173 raise MFUtilException("can't create temp_dir: %s", tmp_dir) 

174 return tmp_dir 

175 

176 

177def get_tmp_filepath(tmp_dir=None, prefix=""): 

178 """Return a tmp (complete) filepath. 

179 

180 The filename is made with get_unique_hexa_identifier() identifier 

181 so 32 hexa characters. 

182 

183 The dirname can be provided by the user (or be a system one). 

184 He will be created if necessary. An exception can be raised if any 

185 problems at this side. 

186 

187 Note: the file is not created or open at all. The function just 

188 returns a filename. 

189 

190 Args: 

191 tmp_dir (string): user provided tmp directory (None 

192 to use the system temp dir). 

193 prefix (string): you can add here a prefix for filenames 

194 (will be preprended before the 32 hexa characters). 

195 

196 Returns: 

197 (string) tmp (complete) filepath. 

198 

199 Raises: 

200 MFUtilException if the temp directory is not good or can't 

201 be created. 

202 

203 """ 

204 temp_dir = _get_temp_dir(tmp_dir) 

205 return os.path.join(temp_dir, prefix + get_unique_hexa_identifier()) 

206 

207 

208def create_tmp_dirpath(tmp_dir=None, prefix=""): 

209 """Create and return a temporary directory inside a father 

210 tempory directory. 

211 

212 The dirname is made with get_unique_hexa_identifier() identifier 

213 so 32 hexa characters. 

214 

215 The father dirname can be provided by the user (or be a system one). 

216 He will be created if necessary. An exception can be raised if any 

217 problems at this side. 

218 

219 Note: the temporary directory is created. 

220 

221 Args: 

222 tmp_dir (string): user provided tmp directory (None 

223 to use the system temp dir). 

224 prefix (string): you can add here a prefix for dirnames 

225 (will be preprended before the 32 hexa characters). 

226 

227 Returns: 

228 (string) complete path of a newly created temporary directory. 

229 

230 Raises: 

231 MFUtilException if the temp directory can't be created. 

232 

233 """ 

234 temp_dir = _get_temp_dir(tmp_dir) 

235 new_temp_dir = os.path.join(temp_dir, 

236 prefix + get_unique_hexa_identifier()) 

237 res = mkdir_p(new_temp_dir, nowarning=True) 

238 if not res: 

239 raise MFUtilException("can't create temp_dir: %s", new_temp_dir) 

240 return new_temp_dir 

241 

242 

243def get_ipv4_for_hostname(hostname, static_mappings={}): 

244 """Translate a host name to IPv4 address format. 

245 

246 The IPv4 address is returned as a string, such as '100.50.200.5'. 

247 If the host name is an IPv4 address itself it is returned unchanged. 

248 

249 You can provide a dictionnary with static mappings. 

250 Following mappings are added by default: 

251 '127.0.0.1' => '127.0.0.1' 

252 'localhost' => '127.0.0.1' 

253 'localhost.localdomain' => '127.0.0.1' 

254 

255 Args: 

256 hostname (string): hostname. 

257 static_mappings (dict): dictionnary of static mappings 

258 ((hostname) string: (ip) string). 

259 

260 Returns: 

261 (string) IPv4 address for the given hostname (None if any problem) 

262 

263 """ 

264 hostname = hostname.lower() 

265 static_mappings.update({'127.0.0.1': '127.0.0.1', 'localhost': '127.0.0.1', 

266 'localhost.localdomain': '127.0.0.1'}) 

267 if hostname in static_mappings: 

268 return static_mappings[hostname] 

269 try: 

270 return socket.gethostbyname(hostname) 

271 except Exception: 

272 return None 

273 

274 

275def get_recursive_mtime(directory, ignores=[]): 

276 """Get the latest mtime recursivly on a directory. 

277 

278 Args: 

279 directory (string): complete path of a directory to scan. 

280 ignores (list of strings): list of shell-style wildcards 

281 to define which filenames/dirnames to ignores (see fnmatch). 

282 

283 Returns: 

284 (int) timestamp of the latest mtime on the directory. 

285 

286 """ 

287 result = 0 

288 for name in os.listdir(directory): 

289 ignored = False 

290 for ssw in ignores: 

291 if fnmatch.fnmatch(name, ssw): 

292 ignored = True 

293 break 

294 if ignored: 

295 continue 

296 fullpath = os.path.join(directory, name) 

297 if os.path.isdir(fullpath): 

298 mtime = get_recursive_mtime(fullpath, ignores=ignores) 

299 else: 

300 mtime = 0 

301 try: 

302 mtime = int(os.path.getmtime(fullpath)) 

303 except Exception: 

304 pass 

305 if mtime > result: 

306 result = mtime 

307 return result 

308 

309 

310def add_inotify_watch(inotify, directory, ignores=[]): 

311 """Register recursively directories to watch. 

312 

313 Args: 

314 inotify (inotify object): object that owns the file descriptors 

315 directory (string): complete path of a directory to scan. 

316 ignores (list of strings): list of shell-style wildcards 

317 to define which filenames/dirnames to ignores (see fnmatch). 

318 

319 """ 

320 watch_flags = flags.MODIFY | flags.CREATE |\ 

321 flags.DELETE | flags.DELETE_SELF 

322 try: 

323 __get_logger().info("watch %s" % directory) 

324 inotify.add_watch(directory, watch_flags) 

325 except Exception as e: 

326 __get_logger().warning("cannot watch %s: %s" % (directory, e)) 

327 

328 if not os.access(directory, os.R_OK): 

329 __get_logger().warning("cannot enter into %s" % directory) 

330 return 

331 

332 for name in os.listdir(directory): 

333 ignored = False 

334 for ssw in ignores: 

335 if fnmatch.fnmatch(name, ssw): 

336 ignored = True 

337 break 

338 if ignored: 

339 continue 

340 fullpath = os.path.join(directory, name) 

341 if os.path.isdir(fullpath): 

342 add_inotify_watch(inotify, fullpath, ignores=ignores) 

343 

344 

345def _kill_process_and_children(process): 

346 children = None 

347 all_children = None 

348 # First we keep the full view of the process tree to kill 

349 try: 

350 all_children = process.children(recursive=True) 

351 except psutil.NoSuchProcess: 

352 pass 

353 # Then, we keep just immediate children to kill in the "good" order 

354 try: 

355 children = process.children(recursive=False) 

356 except psutil.NoSuchProcess: 

357 pass 

358 try: 

359 process.kill() 

360 except psutil.NoSuchProcess: 

361 pass 

362 if children is not None: 

363 for child in children: 

364 _kill_process_and_children(child) 

365 # To be sure, we didn't miss something, we kill the initial full list 

366 if all_children is not None: 

367 for child in all_children: 

368 _kill_process_and_children(child) 

369 

370 

371def kill_process_and_children(pid): 

372 """Kill recursively a complete tree of processes. 

373 

374 Given a pid, this method recursively kills the complete tree (children and 

375 children of each child...) of this process. 

376 

377 The SIGKILL signal is used. 

378 

379 Args: 

380 pid (int): process PID to kill. 

381 

382 """ 

383 try: 

384 process = psutil.Process(pid) 

385 except psutil.NoSuchProcess: 

386 return 

387 _kill_process_and_children(process) 

388 

389 

390def hash_generator(*args): 

391 """Generate a hash from a variable number of arguments as a safe string. 

392 

393 Note that pickle is used so arguments have to be serializable. 

394 

395 Args: 

396 *args: arguments to hash 

397 

398 """ 

399 temp = pickle.dumps(args, pickle.HIGHEST_PROTOCOL) 

400 return hashlib.md5(temp).hexdigest()