Source code for asab.metrics.service

import configparser
import logging
import asyncio
import os

from ..config import Config
from ..abc import Service
from .metrics import (
	Metric, Counter, EPSCounter, Gauge, DutyCycle, AggregationCounter, Histogram,
	CounterWithDynamicTags, AggregationCounterWithDynamicTags, HistogramWithDynamicTags
)
from .storage import Storage


#

L = logging.getLogger(__name__)

#


[docs]class MetricsService(Service): def __init__(self, app, service_name): super().__init__(app, service_name) self.Metrics = [] self.Targets = [] self.Tags = { "host": app.HostName, "appclass": app.__class__.__name__, } # A identified of the host machine (node); added if available at environment variables node_id = os.getenv('NODE_ID', None) if node_id is not None: self.Tags["node_id"] = node_id service_id = os.getenv('SERVICE_ID', None) if service_id is not None: self.Tags["service_id"] = service_id # A unique identifier of a microservice; added as an environment variable. instance_id = os.getenv('INSTANCE_ID', None) if instance_id is not None: self.Tags["instance_id"] = instance_id self.Storage = Storage() app.PubSub.subscribe("Application.tick/60!", self._on_flushing_event) if Config.has_option('asab:metrics', 'target'): for target in Config.get('asab:metrics', 'target').split(): target = target.strip() try: target_type = Config.get('asab:metrics:{}'.format(target), 'type') except configparser.NoOptionError: # This allows to specify the type of the target by its name target_type = target if target_type == 'influxdb': from .influxdb import InfluxDBTarget target = InfluxDBTarget(self, 'asab:metrics:{}'.format(target)) elif target_type == 'http': from .http import HTTPTarget target = HTTPTarget(self, 'asab:metrics:{}'.format(target)) else: raise RuntimeError("Unknown target type {}".format(target_type)) self.Targets.append(target) if Config.getboolean('asab:metrics', 'native_metrics'): from .native import NativeMetrics self._native_svc = NativeMetrics(self.App, self) async def finalize(self, app): await self._on_flushing_event("finalize!") def clear(self): self.Metrics.clear() self.Storage.clear() def _flush_metrics(self): now = self.App.time() self.App.PubSub.publish("Metrics.flush!") for metric in self.Metrics: try: metric.flush(now) except Exception: L.exception("Exception during metric.flush()") return now async def _on_flushing_event(self, event_type): if len(self.Metrics) == 0: return now = self._flush_metrics() pending = set() for target in self.Targets: pending.add( asyncio.ensure_future(target.process(self.Storage.Metrics, now)) ) while len(pending) > 0: done, pending = await asyncio.wait(pending, timeout=180.0, return_when=asyncio.ALL_COMPLETED) def _add_metric(self, metric: Metric, metric_name: str, tags=None, reset=None, help=None, unit=None): # Add global tags metric.StaticTags.update(self.Tags) metric.App = self.App # Add local static tags if tags is not None: metric.StaticTags.update(tags) metric._initialize_storage( self.Storage.add(metric_name, tags=metric.StaticTags.copy(), reset=reset, help=help, unit=unit) ) self.Metrics.append(metric)
[docs] def create_gauge(self, metric_name, tags=None, init_values=None, help=None, unit=None): m = Gauge(init_values=init_values) self._add_metric(m, metric_name, tags=tags, help=help, unit=unit) return m
[docs] def create_counter(self, metric_name, tags=None, init_values=None, reset: bool = True, help=None, unit=None, dynamic_tags=False): if dynamic_tags: m = CounterWithDynamicTags(init_values=init_values) else: m = Counter(init_values=init_values) self._add_metric(m, metric_name, tags=tags, reset=reset, help=help, unit=unit) return m
[docs] def create_eps_counter(self, metric_name, tags=None, init_values=None, reset: bool = True, help=None, unit=None): m = EPSCounter(init_values=init_values) self._add_metric(m, metric_name, tags=tags, reset=reset, help=help, unit=unit) return m
[docs] def create_duty_cycle(self, metric_name, tags=None, init_values=None, help=None, unit=None): m = DutyCycle(self.App, init_values=init_values) self._add_metric(m, metric_name, tags=tags, help=help, unit=unit) return m
[docs] def create_aggregation_counter(self, metric_name, tags=None, init_values=None, reset: bool = True, aggregator=max, help=None, unit=None, dynamic_tags=False): if dynamic_tags: m = AggregationCounterWithDynamicTags(init_values=init_values, aggregator=aggregator) else: m = AggregationCounter(init_values=init_values, aggregator=aggregator) self._add_metric(m, metric_name, tags=tags, reset=reset, help=help, unit=unit) return m
[docs] def create_histogram(self, metric_name, buckets: list, tags=None, init_values=None, reset: bool = True, help=None, unit=None, dynamic_tags=False): if dynamic_tags: m = HistogramWithDynamicTags(buckets=buckets, init_values=init_values) else: m = Histogram(buckets=buckets, init_values=init_values) self._add_metric(m, metric_name, tags=tags, reset=reset, help=help, unit=unit) return m