"""utility classes and functions for managing metwork plugins."""
import logging
import six
import glob
import os
import shutil
import hashlib
import envtpl
from mfutil import BashWrapperException, BashWrapperOrRaise, BashWrapper
from mfutil import mkdir_p_or_die, get_unique_hexa_identifier
from configparser_extended import ExtendedConfigParser
RUNTIME_HOME = os.environ.get('MODULE_RUNTIME_HOME', '/tmp')
MFEXT_HOME = os.environ['MFEXT_HOME']
MODULE_LOWERCASE = os.environ['MODULE_LOWERCASE']
SPEC_TEMPLATE = os.path.join(MFEXT_HOME, "share", "templates", "plugin.spec")
# FIXME: doc
[docs]class MFUtilPluginAlreadyInstalled(Exception):
"""Exception class raised when a plugin is already installed."""
pass
[docs]class MFUtilPluginNotInstalled(Exception):
"""Exception class raised when a plugin is not installed."""
pass
[docs]class MFUtilPluginCantUninstall(BashWrapperException):
"""Exception class raised when we can't uninstall a plugin."""
pass
[docs]class MFUtilPluginCantInstall(BashWrapperException):
"""Exception class raised when we can't install a plugin."""
pass
[docs]class MFUtilPluginCantInit(BashWrapperException):
"""Exception class raised when we can't init the plugin base."""
pass
[docs]class MFUtilPluginCantBuild(BashWrapperException):
"""Exception class raised when we can't build a plugin."""
pass
[docs]class MFUtilPluginBaseNotInitialized(Exception):
"""Exception class raised when the plugin base is not initialized."""
pass
[docs]class MFUtilPluginFileNotFound(Exception):
"""Exception class raised when we can't find the plugin file."""
pass
[docs]class MFUtilPluginInvalid(Exception):
"""Exception class raised when the plugin is invalid."""
pass
def __get_logger():
return logging.getLogger("mfutil.plugins")
[docs]def get_plugins_base_dir():
"""Return the default plugins base directory path.
This value correspond to: ${RUNTIME_HOME}/var/plugins value.
Returns:
(string): the default plugins base directory path.
"""
return os.path.join(RUNTIME_HOME, "var", "plugins")
def _get_plugins_base_dir(plugins_base_dir=None):
if plugins_base_dir is None:
return get_plugins_base_dir()
else:
return plugins_base_dir
def _get_rpm_cmd(command, extra_args="", plugins_base_dir=None,
add_prefix=False):
plugins_base_dir = _get_plugins_base_dir(plugins_base_dir)
base = os.path.join(plugins_base_dir, "base")
if add_prefix:
cmd = 'layer_wrapper --layers=rpm@mfext -- rpm %s ' \
'--dbpath %s --prefix %s %s' % \
(command, base, plugins_base_dir, extra_args)
else:
cmd = 'layer_wrapper --layers=rpm@mfext -- rpm %s ' \
'--dbpath %s %s' % \
(command, base, extra_args)
return cmd
[docs]def init_plugins_base(plugins_base_dir=None):
"""Initialize the plugins base.
Args:
plugins_base_dir (string): alternate plugins base directory
(useful for unit tests).
Raises:
MFUtilPluginCantInit: if we can't init the plugin base.
"""
plugins_base_dir = _get_plugins_base_dir(plugins_base_dir)
shutil.rmtree(plugins_base_dir, ignore_errors=True)
mkdir_p_or_die(plugins_base_dir)
mkdir_p_or_die(os.path.join(plugins_base_dir, "base"))
cmd = _get_rpm_cmd("--initdb", plugins_base_dir=plugins_base_dir)
BashWrapperOrRaise(cmd, MFUtilPluginCantInit,
"can't init %s" % plugins_base_dir)
[docs]def is_plugins_base_initialized(plugins_base_dir=None):
"""Return True is the plugins base is already initialized for the module.
You can pass a plugins_base_dir as argument but you don't have to do that,
except for unit testing.
The plugins base dir is stored by default in :
${MODULE_RUNTIME_HOME}/var/plugins
Args:
plugins_base_dir (string): alternate plugins base directory.
Returns:
boolean: True if the base is initialized.
"""
plugins_base_dir = _get_plugins_base_dir(plugins_base_dir)
return os.path.isfile(os.path.join(plugins_base_dir, "base", "Name"))
def _assert_plugins_base_initialized(plugins_base_dir=None):
if not is_plugins_base_initialized(plugins_base_dir=plugins_base_dir):
raise MFUtilPluginBaseNotInitialized()
def get_installed_plugins(plugins_base_dir=None):
_assert_plugins_base_initialized(plugins_base_dir)
plugins_base_dir = _get_plugins_base_dir(plugins_base_dir)
frmt = "%{name}~~~%{version}~~~%{release}\\n"
cmd = _get_rpm_cmd('-qa', '--qf "%s"' % frmt,
plugins_base_dir=plugins_base_dir)
x = BashWrapperOrRaise(cmd)
tmp = x.stdout.split('\n')
result = []
for line in tmp:
tmp2 = line.split('~~~')
if len(tmp2) == 3:
result.append({'name': tmp2[0],
'version': tmp2[1],
'release': tmp2[2]})
for tmp in os.listdir(plugins_base_dir):
directory_name = tmp.strip()
if directory_name == 'base':
continue
directory = os.path.join(plugins_base_dir, directory_name)
if os.path.islink(directory):
result.append({'name': directory_name,
'version': 'dev_link',
'release': 'dev_link'})
return result
def uninstall_plugin(name, plugins_base_dir=None,
ignore_errors=False, quiet=False):
_assert_plugins_base_initialized(plugins_base_dir)
plugins_base_dir = _get_plugins_base_dir(plugins_base_dir)
infos = get_plugin_info(name, mode="name",
plugins_base_dir=plugins_base_dir)
if infos is None:
raise MFUtilPluginNotInstalled("plugin %s is not installed" % name)
version = infos['metadatas']['version']
release = infos['metadatas']['release']
if release == 'dev_link':
preuninstall_status = _preuninstall_plugin(name, version, release,
quiet=quiet)
if not preuninstall_status and not ignore_errors:
raise MFUtilPluginCantUninstall("can't uninstall plugin %s" % name)
os.unlink(os.path.join(plugins_base_dir, name))
return
preuninstall_status = _preuninstall_plugin(name, version, release)
if not preuninstall_status and not ignore_errors:
raise MFUtilPluginCantUninstall("can't uninstall plugin %s" % name)
cmd = _get_rpm_cmd('-e --noscripts %s' % name,
plugins_base_dir=plugins_base_dir, add_prefix=False)
try:
x = BashWrapperOrRaise(cmd, MFUtilPluginCantUninstall,
"can't uninstall %s" % name)
except MFUtilPluginCantUninstall:
if not ignore_errors:
raise
shutil.rmtree(os.path.join(plugins_base_dir, name), ignore_errors=True)
infos = get_plugin_info(name, mode="name",
plugins_base_dir=plugins_base_dir)
if infos is not None:
raise MFUtilPluginCantUninstall("can't uninstall plugin %s" % name,
bash_wrapper=x)
if os.path.exists(os.path.join(plugins_base_dir, name)):
raise MFUtilPluginCantUninstall("can't uninstall plugin %s "
"(directory still here)" % name)
def _postinstall_plugin(name, version, release, quiet=False):
res = BashWrapper("_plugins.postinstall %s %s %s" %
(name, version, release))
if not res:
if not quiet:
__get_logger().warning("error during postinstall: %s", res)
return False
return True
def _preuninstall_plugin(name, version, release, quiet=False):
res = BashWrapper("_plugins.preuninstall %s %s %s" %
(name, version, release))
if not res:
if not quiet:
__get_logger().warning("error during postuninstall: %s", res)
return False
return True
def install_plugin(plugin_filepath, plugins_base_dir=None,
ignore_errors=False, quiet=False):
_assert_plugins_base_initialized(plugins_base_dir)
if not os.path.isfile(plugin_filepath):
raise MFUtilPluginFileNotFound("plugin file %s not found" %
plugin_filepath)
infos = get_plugin_info(plugin_filepath, mode="file",
plugins_base_dir=plugins_base_dir)
if infos is None:
raise MFUtilPluginInvalid("invalid %s plugin" % plugin_filepath)
name = infos['metadatas']['name']
version = infos['metadatas']['version']
release = infos['metadatas']['release']
installed_infos = get_plugin_info(name, mode="name",
plugins_base_dir=plugins_base_dir)
if installed_infos is not None:
raise MFUtilPluginAlreadyInstalled("plugin %s already installed" %
name)
cmd = _get_rpm_cmd('-Uvh --noscripts --force %s' % plugin_filepath,
plugins_base_dir=plugins_base_dir, add_prefix=True)
x = BashWrapperOrRaise(cmd, MFUtilPluginCantInstall,
"can't install plugin %s" % name)
infos = get_plugin_info(name, mode="name",
plugins_base_dir=plugins_base_dir)
if infos is None:
raise MFUtilPluginCantInstall("can't install plugin %s" % name,
bash_wrapper=x)
postinstall_status = _postinstall_plugin(name, version, release,
quiet=quiet)
if not postinstall_status and not ignore_errors:
try:
uninstall_plugin(name, plugins_base_dir, True, True)
except Exception:
pass
def _make_plugin_spec(dest_file, name, version, summary, license, packager,
vendor, url):
with open(SPEC_TEMPLATE, "r") as f:
template = f.read()
extra_vars = {"NAME": name, "VERSION": version, "SUMMARY": summary,
"LICENSE": license, "PACKAGER": packager, "VENDOR": vendor,
"URL": url}
res = envtpl.render_string(template, extra_variables=extra_vars)
# because, you can have some template inside extra vars
res = envtpl.render_string(res)
with open(dest_file, "w") as f:
f.write(res)
def develop_plugin(plugin_path, name, plugins_base_dir=None,
ignore_errors=False, quiet=False):
plugin_path = os.path.abspath(plugin_path)
plugins_base_dir = _get_plugins_base_dir(plugins_base_dir)
installed_infos = get_plugin_info(name, mode="name",
plugins_base_dir=plugins_base_dir)
if installed_infos is not None:
raise MFUtilPluginAlreadyInstalled("plugin %s already installed" %
name)
shutil.rmtree(os.path.join(plugins_base_dir, name), True)
os.symlink(plugin_path, os.path.join(plugins_base_dir, name))
postinstall_status = _postinstall_plugin(name, "dev_link", "dev_link",
quiet=quiet)
if not postinstall_status and not ignore_errors:
try:
uninstall_plugin(name, plugins_base_dir, True, True)
except Exception:
pass
def _is_dev_link_plugin(name, plugins_base_dir=None):
plugins_base_dir = _get_plugins_base_dir(plugins_base_dir)
return os.path.islink(os.path.join(plugins_base_dir, name))
def build_plugin(plugin_path, plugins_base_dir=None):
plugin_path = os.path.abspath(plugin_path)
plugins_base_dir = _get_plugins_base_dir(plugins_base_dir)
base = os.path.join(plugins_base_dir, "base")
pwd = os.getcwd()
parser = ExtendedConfigParser(config=os.environ.get('MFCONFIG', 'GENERIC'),
strict=False, inheritance="im")
with open(os.path.join(plugin_path, "config.ini"), "r") as f:
config_content = f.read()
if six.PY2:
parser.read_string(config_content.decode('utf-8'))
else:
parser.read_string(config_content)
name = parser['general']['name']
version = parser['general']['version']
summary = parser['general']['summary']
license = parser['general']['license']
try:
packager = parser['general']['packager']
except Exception:
packager = parser['general']['maintainer']
vendor = parser['general']['vendor']
url = parser['general']['url']
tmpdir = os.path.join(RUNTIME_HOME, "tmp",
"plugin_%s" % get_unique_hexa_identifier())
mkdir_p_or_die(os.path.join(tmpdir, "BUILD"))
mkdir_p_or_die(os.path.join(tmpdir, "RPMS"))
mkdir_p_or_die(os.path.join(tmpdir, "SRPMS"))
_make_plugin_spec(os.path.join(tmpdir, "specfile.spec"), name, version,
summary, license, packager, vendor, url)
cmd = "source %s/lib/bash_utils.sh ; " % MFEXT_HOME
cmd = cmd + "layer_load rpm@mfext ; "
cmd = cmd + 'rpmbuild --define "_topdir %s" --define "pwd %s" ' \
'--define "prefix %s" --dbpath %s ' \
'-bb %s/specfile.spec' % (tmpdir, plugin_path, tmpdir,
base, tmpdir)
x = BashWrapperOrRaise(cmd, MFUtilPluginCantBuild,
"can't build plugin %s" % plugin_path)
tmp = glob.glob(os.path.join(tmpdir, "RPMS", "x86_64", "*.rpm"))
if len(tmp) == 0:
raise MFUtilPluginCantBuild("can't find generated plugin" %
plugin_path, bash_wrapper=x)
plugin_path = tmp[0]
new_basename = \
os.path.basename(plugin_path).replace("x86_64.rpm",
"metwork.%s.plugin" %
MODULE_LOWERCASE)
new_plugin_path = os.path.join(pwd, new_basename)
shutil.move(plugin_path, new_plugin_path)
shutil.rmtree(tmpdir, True)
os.chdir(pwd)
return new_plugin_path
def get_plugin_info(name_or_filepath, mode="auto", plugins_base_dir=None):
plugins_base_dir = _get_plugins_base_dir(plugins_base_dir)
_assert_plugins_base_initialized(plugins_base_dir)
res = {}
if mode == "auto":
mode = "name"
if '/' in name_or_filepath or '.' in name_or_filepath:
mode = "file"
else:
if os.path.isfile(name_or_filepath):
mode = "file"
if mode == "file":
cmd = _get_rpm_cmd('-qi', '-p %s' % name_or_filepath,
plugins_base_dir=plugins_base_dir)
elif mode == "name":
if _is_dev_link_plugin(name_or_filepath,
plugins_base_dir=plugins_base_dir):
res['metadatas'] = {}
res['metadatas']['name'] = name_or_filepath
res['metadatas']['release'] = 'dev_link'
res['metadatas']['version'] = 'dev_link'
res['raw_metadata_output'] = 'DEV LINK'
res['raw_files_output'] = 'DEV LINK'
res['files'] = []
return res
cmd = _get_rpm_cmd('-qi', name_or_filepath,
plugins_base_dir=plugins_base_dir)
else:
__get_logger().warning("unknown mode [%s]" % mode)
return None
metadata_output = BashWrapper(cmd)
if not metadata_output:
return None
res['raw_metadata_output'] = metadata_output.stdout
for line in metadata_output.stdout.split('\n'):
tmp = line.strip().split(':', 1)
if len(tmp) <= 1:
continue
name = tmp[0].strip().lower()
value = tmp[1].strip()
if 'metadatas' not in res:
res['metadatas'] = {}
res['metadatas'][name] = value
if mode == "file":
cmd = _get_rpm_cmd('-ql -p %s' % name_or_filepath,
plugins_base_dir=plugins_base_dir)
else:
cmd = _get_rpm_cmd('-ql %s' % name_or_filepath,
plugins_base_dir=plugins_base_dir)
files_output = BashWrapper(cmd)
if not files_output:
return None
res['files'] = [x.strip() for x in files_output.stdout.split('\n')]
res['raw_files_output'] = files_output.stdout
return res
def get_plugin_hash(name_or_filepath, mode="auto", plugins_base_dir=None):
infos = get_plugin_info(name_or_filepath, mode=mode,
plugins_base_dir=plugins_base_dir)
if infos is None:
return None
sid = ", ".join([infos['metadatas'].get('build host', 'unknown'),
infos['metadatas'].get('build date', 'unknown'),
infos['metadatas'].get('size', 'unknown'),
infos['metadatas'].get('version', 'unknown'),
infos['metadatas'].get('release', 'unknown')])
return hashlib.md5(sid.encode('utf8')).hexdigest()