Source code for acquisition.stats

import os

try:
    from time import perf_counter as time_now
except ImportError:
    # fall back to using time
    from time import time as time_now

from statsd import StatsClient

HOSTNAME = os.environ.get("MFCOM_HOSTNAME", "unknown")
MODULE = os.environ["MODULE"]
ADMIN_HOSTNAME_IP = os.environ.get("%s_ADMIN_HOSTNAME_IP" % MODULE, "null")
STATSD_PORT = int(os.environ.get("%s_TELEGRAF_STATSD_PORT" % MODULE, "0"))

__CACHE = {}


class DummyContextManager:

    _start_time = None
    ms = None

    def __enter__(self):
        pass

    def __exit__(self, *args, **kwargs):
        pass

    def _do_nothing(self, *args, **kwargs):
        pass

    def __getattr__(self, name):
        return self._do_nothing

    def start(self, *args, **kwargs):
        self._start_time = time_now()

    def stop(self, *args, **kwargs):
        if self._start_time is None:
            raise RuntimeError('timer has not started.')
        dt = time_now() - self._start_time
        self.ms = 1000.0 * dt
        return self


[docs]class AcquisitionStatsDClient(object): plugin_name = None step_name = None __suffix_cache = None def __init__(self, plugin_name, step_name, extra_tags={}): self.plugin_name = plugin_name self.step_name = step_name self.extra_tags = extra_tags
[docs] def gauge(self, stat, value): raise NotImplementedError()
[docs] def set(self, stat, value): raise NotImplementedError()
[docs] def timing(self, stat, delta): raise NotImplementedError()
[docs] def incr(self, stat, count): raise NotImplementedError()
[docs] def decr(self, stat, count): raise NotImplementedError()
[docs] def timer(self, stat): raise NotImplementedError()
[docs] def pipeline(self): raise NotImplementedError()
def _get_prefix(self): return "" def _get_suffix(self): if self.__suffix_cache is None: tmp = ["", "module=%s" % MODULE.lower(), "host=%s" % HOSTNAME] if self.plugin_name is not None: tmp.append("plugin=%s" % self.plugin_name) if self.step_name is not None: tmp.append("step=%s" % self.step_name) for k, v in self.extra_tags.items(): tmp.append("%s=%s" % (k, v)) self.__suffix_cache = ",".join(tmp) return self.__suffix_cache def _stat(self, stat): return "%s%s%s" % (self._get_prefix(), stat, self._get_suffix())
class AcquisitionPyStatsDClientPipelineAdapter(object): _instance = None _instance2 = None def __init__(self, instance, instance2): self._instance = instance self._instance = instance2 def __enter__(self): pass def __exit__(self, *args, **kwargs): self._instance.send() def gauge(self, stat, value): self._instance.gauge(self._instance2._stat(stat), value) def set(self, stat, value): self._instance.set(self._instance2._stat(stat), value) def timing(self, stat, delta): self._instance.timing(self._instance2._stat(stat), delta) def incr(self, stat, count): self._instance.incr(self._instance2._stat(stat), count) def decr(self, stat, count): self._instance.decr(self._instance2._stat(stat), count) class AcquisitionPyStatsDClient(AcquisitionStatsDClient): _instance = None def _get_instance(self): if self._instance is None: self._instance = StatsClient(host="localhost", port=STATSD_PORT) return self._instance def gauge(self, stat, value): self._get_instance().gauge(self._stat(stat), value) def set(self, stat, value): self._get_instance().set(self._stat(stat), value) def timing(self, stat, delta): self._get_instance().timing(self._stat(stat), delta) def incr(self, stat, count): self._get_instance().incr(self._stat(stat), count) def decr(self, stat, count): self._get_instance().decr(self._stat(stat), count) def timer(self, stat): return self._get_instance().timer(self._stat(stat)) def pipeline(self): i = self._get_instance().pipeline() return AcquisitionPyStatsDClientPipelineAdapter(i, self) class AcquisitionDummyStatsDClient(AcquisitionStatsDClient): def timer(self, stat): return DummyContextManager() def gauge(self, stat, value): pass def set(self, stat, value): pass def timing(self, stat, delta): pass def incr(self, stat, count): pass def decr(self, stat, count): pass def pipeline(self): return DummyContextManager() def get_stats_client(plugin_name, step_name, extra_tags={}): global __CACHE key = "%s.%s" % (plugin_name, step_name) for v in extra_tags.values(): key += ".%s" % v if key not in __CACHE: if ADMIN_HOSTNAME_IP != "null" and STATSD_PORT != 0: __CACHE[key] = AcquisitionPyStatsDClient(plugin_name, step_name, extra_tags) else: __CACHE[key] = AcquisitionDummyStatsDClient(plugin_name, step_name, extra_tags) return __CACHE[key]