Coverage for mfplugin/configuration.py: 63%

240 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-11-13 15:57 +0000

1import os 

2import re 

3import copy 

4import sys 

5import fnmatch 

6import inspect 

7import opinionated_configparser 

8import cerberus 

9from mfplugin.utils import validate_configparser, \ 

10 cerberus_errors_to_human_string 

11from mfplugin.app import APP_SCHEMA, App 

12from mfplugin.extra_daemon import EXTRA_DAEMON_SCHEMA, ExtraDaemon 

13from mfplugin.utils import BadPlugin, resolve, get_current_envs, \ 

14 PluginEnvContextManager, NON_REQUIRED_BOOLEAN_DEFAULT_TRUE, \ 

15 NON_REQUIRED_STRING_DEFAULT_1, \ 

16 get_app_class, get_extra_daemon_class, get_nice_dump, is_jsonable, \ 

17 get_configuration_path, get_configuration_paths 

18 

19 

20MFMODULE = os.environ.get("MFMODULE", "GENERIC") 

21MFMODULE_LOWERCASE = os.environ.get("MFMODULE_LOWERCASE", "generic") 

22MFMODULE_RUNTIME_HOME = os.environ.get("MFMODULE_RUNTIME_HOME", "/tmp") 

23 

24SCHEMA = { 

25 "general": { 

26 "required": True, 

27 "type": "dict", 

28 "schema": { 

29 "_version": {"required": True, "type": "string", 

30 "regex": r"^[a-z0-9-_\.]+$"}, 

31 "_release": { 

32 **NON_REQUIRED_STRING_DEFAULT_1 

33 }, 

34 "_summary": {"required": True, "type": "string", "minlength": 1}, 

35 "_license": {"required": True, "type": "string", "minlength": 1}, 

36 "_url": {"required": True, "type": "string", "minlength": 1}, 

37 "_maintainer": {"required": True, "type": "string", 

38 "minlength": 1}, 

39 "_vendor": {"required": True, "type": "string", "minlength": 1}, 

40 "_add_plugin_dir_to_python_path": { 

41 **NON_REQUIRED_BOOLEAN_DEFAULT_TRUE 

42 } 

43 }, 

44 }, 

45 "app_*": { 

46 "required": False, 

47 "type": "dict", 

48 "allow_unknown": False, 

49 "schema": { 

50 **APP_SCHEMA 

51 }, 

52 }, 

53 "extra_daemon_*": { 

54 "required": False, 

55 "type": "dict", 

56 "allow_unknown": False, 

57 "schema": { 

58 **EXTRA_DAEMON_SCHEMA 

59 }, 

60 }, 

61 "custom*": { 

62 "required": False, 

63 "type": "dict", 

64 "allow_unknown": True, 

65 "schema": {} 

66 }, 

67} 

68 

69 

70class Configuration(object): 

71 

72 def __init__(self, plugin_name, plugin_home, config_filepath=None, 

73 extra_daemon_class=None, 

74 app_class=None, 

75 dont_read_config_overrides=False): 

76 self.plugin_name = plugin_name 

77 self.plugin_home = plugin_home 

78 self.app_class = get_app_class(app_class, App) 

79 self.extra_daemon_class = get_extra_daemon_class(extra_daemon_class, 

80 ExtraDaemon) 

81 if config_filepath is None: 

82 self._config_filepath = get_configuration_path(self.plugin_home) 

83 else: 

84 self._config_filepath = config_filepath 

85 if not os.path.isfile(self._config_filepath): 

86 raise BadPlugin("configuration file: %s is missing" % 

87 self._config_filepath) 

88 if dont_read_config_overrides: 

89 paths = [get_configuration_path(self.plugin_home)] 

90 else: 

91 paths = get_configuration_paths(plugin_name, plugin_home) 

92 self.paths = [x for x in paths if os.path.isfile(x)] 

93 self._commands = None 

94 self._doc = None 

95 self.__loaded = False 

96 

97 def get_schema(self): 

98 return copy.deepcopy(SCHEMA) 

99 

100 def get_configuration_env_dict(self, ignore_keys_starting_with=None, 

101 limit_to_section=None): 

102 self.load() 

103 env_var_dict = {} 

104 if limit_to_section is None: 

105 sections = self._doc.keys() 

106 else: 

107 if limit_to_section in list(self._doc.keys()): 

108 sections = [limit_to_section] 

109 else: 

110 sections = [] 

111 for section in sections: 

112 if not re.match("^[a-zA-Z0-9-_]+$", section): 

113 continue 

114 for option in self._doc[section].keys(): 

115 if ignore_keys_starting_with and \ 

116 option.strip().startswith(ignore_keys_starting_with): 

117 continue 

118 if not re.match("^[a-zA-Z0-9-_]+$", option): 

119 continue 

120 val = self._doc[section][option] 

121 name = "%s_CURRENT_PLUGIN_%s_%s" % \ 

122 (MFMODULE, section.upper().replace('-', '_'), 

123 option.upper().replace('-', '_')) 

124 if isinstance(val, bool): 

125 env_var_dict[name] = "1" if val else "0" 

126 else: 

127 env_var_dict[name] = "%s" % val 

128 return env_var_dict 

129 

130 def __get_schema(self, parser): 

131 schema = self.get_schema() 

132 to_delete = [x for x in schema.keys() if "*" in x or "?" in x] 

133 for key in to_delete: 

134 orig = schema[key] 

135 del schema[key] 

136 for section in parser.sections(): 

137 if fnmatch.fnmatch(section, key): 

138 schema[section] = copy.deepcopy(orig) 

139 return schema 

140 

141 def __get_public_schema(self, parser): 

142 schema = self.__get_schema(parser) 

143 public_schema = {} 

144 for section in schema.keys(): 

145 if section.startswith('_'): 

146 continue 

147 if 'schema' not in schema[section]: 

148 continue 

149 public_schema[section] = \ 

150 {x: y for x, y in schema[section].items() if x != "schema"} 

151 public_schema[section]['schema'] = {} 

152 for key in schema[section]['schema'].keys(): 

153 if key.startswith('_'): 

154 continue 

155 public_schema[section]['schema'][key] = \ 

156 schema[section]['schema'][key] 

157 return public_schema 

158 

159 def _get_debug(self): 

160 self.load() 

161 res = {x: y for x, y in inspect.getmembers(self) 

162 if is_jsonable(y) and not x.startswith('_') 

163 and not x.startswith('raw_')} 

164 res["_doc"] = self._doc 

165 return res 

166 

167 def get_final_document(self, validated_document): 

168 return validated_document 

169 

170 def __get_final_document(self, validated_document): 

171 vdocument = {} 

172 for section in validated_document.keys(): 

173 for option in validated_document[section].keys(): 

174 val = validated_document[section][option] 

175 if section not in vdocument: 

176 vdocument[section] = {} 

177 if option.endswith("_hostname") or option == "hostname": 

178 if "%s_ip" % option not in validated_document[section]: 

179 new_val = resolve(val) 

180 if new_val is None: 

181 new_val = "dns_error" 

182 vdocument[section]["%s_ip" % option] = new_val 

183 elif option.endswith("_hostnames") or option == "hostnames": 

184 if "%s_ips" % option not in validated_document[section]: 

185 hostname_list = val.split(";") 

186 new_vals = [] 

187 for hostname in hostname_list: 

188 new_val = resolve(hostname) 

189 if new_val is None: 

190 new_val = "dns_error" 

191 new_vals.append(new_val) 

192 vdocument[section]["%s_ips" % option] = \ 

193 ";".join(new_vals) 

194 vdocument[section][option] = val 

195 try: 

196 return self.get_final_document(vdocument) 

197 except BadPlugin: 

198 # we raise this exception 

199 raise 

200 except Exception: 

201 print("exception catched during get_final_document(), vdocument:", 

202 file=sys.stderr) 

203 print(get_nice_dump(vdocument), file=sys.stderr) 

204 print("=> reraising", file=sys.stderr) 

205 raise 

206 

207 def __validate(self, paths, public=False): 

208 v = cerberus.Validator() 

209 v.allow_unknown = False 

210 v.require_all = True 

211 parser = opinionated_configparser.OpinionatedConfigParser( 

212 delimiters=("=",), comment_prefixes=("#",)) 

213 parser.optionxform = str 

214 try: 

215 parser.read(paths) 

216 except Exception as e: 

217 raise BadPlugin("can't read configuration paths: %s" % 

218 ", ".join(paths), original_exception=e) 

219 if public: 

220 schema = self.__get_public_schema(parser) 

221 else: 

222 schema = self.__get_schema(parser) 

223 status = validate_configparser(v, parser, schema, public=public) 

224 if status is False: 

225 return (status, v.errors, None) 

226 else: 

227 return (True, {}, v.normalized(v.document)) 

228 

229 def load(self): 

230 with PluginEnvContextManager(get_current_envs(self.plugin_name, 

231 self.plugin_home)): 

232 if self.__loaded: 

233 return False 

234 self.__loaded = True 

235 status, vv_errors, v_document = self.__validate(self.paths) 

236 if status is False: 

237 if len(self.paths) == 1: 

238 errors = cerberus_errors_to_human_string(vv_errors) 

239 raise BadPlugin( 

240 "invalid configuration file: %s" % 

241 self._config_filepath, 

242 validation_errors=errors) 

243 else: 

244 # we are trying to find the bad file 

245 status, v_errors, _ = \ 

246 self.__validate([self._config_filepath]) 

247 if status is False: 

248 errors = cerberus_errors_to_human_string(v_errors) 

249 raise BadPlugin( 

250 "invalid configuration file: %s" % 

251 self._config_filepath, 

252 validation_errors=errors) 

253 for p in self.paths: 

254 if p == self._config_filepath: 

255 continue 

256 status, v_errors, _ = \ 

257 self.__validate([p], public=True) 

258 if status is False: 

259 errors = cerberus_errors_to_human_string(v_errors) 

260 raise BadPlugin( 

261 "invalid configuration, please fix: %s" % p, 

262 validation_errors=errors) 

263 errors = cerberus_errors_to_human_string(vv_errors) 

264 candidates = " or ".join(self.paths) 

265 raise BadPlugin( 

266 "invalid configuration, please fix: %s" % candidates, 

267 validation_errors=errors) 

268 self._doc = self.__get_final_document(v_document) 

269 self._apps = [] 

270 self._extra_daemons = [] 

271 # FIXME: step mfdata ? 

272 for section in [x for x in self._doc.keys() 

273 if x.startswith("app_") or x.startswith("step_")]: 

274 c = self.app_class 

275 if section.startswith("app_"): 

276 name = section[4:] 

277 elif section.startswith("step_"): 

278 name = section[5:] 

279 else: 

280 raise Exception("non handled case: %s" % section) 

281 command = c(self.plugin_home, self.plugin_name, name, 

282 self._doc[section], self._doc.get('custom', {})) 

283 self.add_app(command) 

284 for section in [x for x in self._doc.keys() 

285 if x.startswith("extra_daemon_")]: 

286 c = self.extra_daemon_class 

287 command = c(self.plugin_home, 

288 self.plugin_name, 

289 section.replace('extra_daemon_', '', 1), 

290 self._doc[section], 

291 self._doc.get('custom', {})) 

292 self.add_extra_daemon(command) 

293 self.after_load() 

294 

295 def after_load(self): 

296 pass 

297 

298 def add_app(self, app): 

299 self.load() 

300 self._apps.append(app) 

301 

302 def add_step(self, app): 

303 self.add_app(app) 

304 

305 def add_extra_daemon(self, extra_daemon): 

306 self.load() 

307 self._extra_daemons.append(extra_daemon) 

308 

309 def load_full(self): 

310 self.load() 

311 

312 @property 

313 def apps(self): 

314 self.load() 

315 return self._apps 

316 

317 @property 

318 def steps(self): 

319 self.load() 

320 return self._apps 

321 

322 @property 

323 def extra_daemons(self): 

324 self.load() 

325 return self._extra_daemons 

326 

327 @property 

328 def version(self): 

329 self.load() 

330 return self._doc['general']['_version'] 

331 

332 @property 

333 def release(self): 

334 self.load() 

335 return self._doc['general']['_release'] 

336 

337 @property 

338 def summary(self): 

339 self.load() 

340 return self._doc['general']['_summary'] 

341 

342 @property 

343 def license(self): 

344 self.load() 

345 return self._doc['general']['_license'] 

346 

347 @property 

348 def maintainer(self): 

349 self.load() 

350 return self._doc['general']['_maintainer'] 

351 

352 @property 

353 def packager(self): 

354 return self.maintainer 

355 

356 @property 

357 def vendor(self): 

358 self.load() 

359 return self._doc['general']['_vendor'] 

360 

361 @property 

362 def url(self): 

363 self.load() 

364 return self._doc['general']['_url'] 

365 

366 @property 

367 def add_plugin_dir_to_python_path(self): 

368 self.load() 

369 return self._doc['general']['_add_plugin_dir_to_python_path']