Source code for opentelemetry.sdk.metrics

# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import threading
from typing import Dict, Sequence, Tuple, Type

from opentelemetry import metrics as metrics_api
from opentelemetry.sdk.metrics.export.aggregate import Aggregator
from opentelemetry.sdk.metrics.export.batcher import UngroupedBatcher
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.util.instrumentation import InstrumentationInfo

logger = logging.getLogger(__name__)


[docs]def get_labels_as_key(labels: Dict[str, str]) -> Tuple[Tuple[str, str]]: """Gets a list of labels that can be used as a key in a dictionary.""" return tuple(sorted(labels.items()))
[docs]class BaseBoundInstrument: """Class containing common behavior for all bound metric instruments. Bound metric instruments are responsible for operating on data for metric instruments for a specific set of labels. Args: value_type: The type of values for this bound instrument (int, float). enabled: True if the originating instrument is enabled. aggregator: The aggregator for this bound metric instrument. Will handle aggregation upon updates and checkpointing of values for exporting. """ def __init__( self, value_type: Type[metrics_api.ValueT], enabled: bool, aggregator: Aggregator, ): self.value_type = value_type self.enabled = enabled self.aggregator = aggregator self._ref_count = 0 self._ref_count_lock = threading.Lock() def _validate_update(self, value: metrics_api.ValueT) -> bool: if not self.enabled: return False if not isinstance(value, self.value_type): logger.warning( "Invalid value passed for %s.", self.value_type.__name__ ) return False return True
[docs] def update(self, value: metrics_api.ValueT): self.aggregator.update(value)
[docs] def release(self): self.decrease_ref_count()
[docs] def decrease_ref_count(self): with self._ref_count_lock: self._ref_count -= 1
[docs] def increase_ref_count(self): with self._ref_count_lock: self._ref_count += 1
[docs] def ref_count(self): with self._ref_count_lock: return self._ref_count
def __repr__(self): return '{}(data="{}")'.format( type(self).__name__, self.aggregator.current )
[docs]class BoundCounter(metrics_api.BoundCounter, BaseBoundInstrument):
[docs] def add(self, value: metrics_api.ValueT) -> None: """See `opentelemetry.metrics.BoundCounter.add`.""" if self._validate_update(value): self.update(value)
[docs]class BoundMeasure(metrics_api.BoundMeasure, BaseBoundInstrument):
[docs] def record(self, value: metrics_api.ValueT) -> None: """See `opentelemetry.metrics.BoundMeasure.record`.""" if self._validate_update(value): self.update(value)
[docs]class Metric(metrics_api.Metric): """Base class for all metric types. Also known as metric instrument. This is the class that is used to represent a metric that is to be continuously recorded and tracked. Each metric has a set of bound metrics that are created from the metric. See `BaseBoundInstrument` for information on bound metric instruments. """ BOUND_INSTR_TYPE = BaseBoundInstrument def __init__( self, name: str, description: str, unit: str, value_type: Type[metrics_api.ValueT], meter: "Meter", label_keys: Sequence[str] = (), enabled: bool = True, ): self.name = name self.description = description self.unit = unit self.value_type = value_type self.meter = meter self.label_keys = label_keys self.enabled = enabled self.bound_instruments = {} self.bound_instruments_lock = threading.Lock()
[docs] def bind(self, labels: Dict[str, str]) -> BaseBoundInstrument: """See `opentelemetry.metrics.Metric.bind`.""" key = get_labels_as_key(labels) with self.bound_instruments_lock: bound_instrument = self.bound_instruments.get(key) if bound_instrument is None: bound_instrument = self.BOUND_INSTR_TYPE( self.value_type, self.enabled, # Aggregator will be created based off type of metric self.meter.batcher.aggregator_for(self.__class__), ) self.bound_instruments[key] = bound_instrument bound_instrument.increase_ref_count() return bound_instrument
def __repr__(self): return '{}(name="{}", description="{}")'.format( type(self).__name__, self.name, self.description ) UPDATE_FUNCTION = lambda x, y: None # noqa: E731
[docs]class Counter(Metric, metrics_api.Counter): """See `opentelemetry.metrics.Counter`. """ BOUND_INSTR_TYPE = BoundCounter
[docs] def add(self, value: metrics_api.ValueT, labels: Dict[str, str]) -> None: """See `opentelemetry.metrics.Counter.add`.""" bound_intrument = self.bind(labels) bound_intrument.add(value) bound_intrument.release()
UPDATE_FUNCTION = add
[docs]class Measure(Metric, metrics_api.Measure): """See `opentelemetry.metrics.Measure`.""" BOUND_INSTR_TYPE = BoundMeasure
[docs] def record( self, value: metrics_api.ValueT, labels: Dict[str, str] ) -> None: """See `opentelemetry.metrics.Measure.record`.""" bound_intrument = self.bind(labels) bound_intrument.record(value) bound_intrument.release()
UPDATE_FUNCTION = record
[docs]class Observer(metrics_api.Observer): """See `opentelemetry.metrics.Observer`.""" def __init__( self, callback: metrics_api.ObserverCallbackT, name: str, description: str, unit: str, value_type: Type[metrics_api.ValueT], meter: "Meter", label_keys: Sequence[str] = (), enabled: bool = True, ): self.callback = callback self.name = name self.description = description self.unit = unit self.value_type = value_type self.meter = meter self.label_keys = label_keys self.enabled = enabled self.aggregators = {}
[docs] def observe( self, value: metrics_api.ValueT, labels: Dict[str, str] ) -> None: if not self.enabled: return if not isinstance(value, self.value_type): logger.warning( "Invalid value passed for %s.", self.value_type.__name__ ) return key = get_labels_as_key(labels) if key not in self.aggregators: # TODO: how to cleanup aggregators? self.aggregators[key] = self.meter.batcher.aggregator_for( self.__class__ ) aggregator = self.aggregators[key] aggregator.update(value)
[docs] def run(self) -> bool: try: self.callback(self) # pylint: disable=broad-except except Exception as exc: logger.warning( "Exception while executing observer callback: %s.", exc ) return False return True
def __repr__(self): return '{}(name="{}", description="{}")'.format( type(self).__name__, self.name, self.description )
[docs]class Record: """Container class used for processing in the `Batcher`""" def __init__( self, metric: metrics_api.MetricT, labels: Dict[str, str], aggregator: Aggregator, ): self.metric = metric self.labels = labels self.aggregator = aggregator
[docs]class Meter(metrics_api.Meter): """See `opentelemetry.metrics.Meter`. Args: instrumentation_info: The `InstrumentationInfo` for this meter. stateful: Indicates whether the meter is stateful. """ def __init__( self, instrumentation_info: "InstrumentationInfo", stateful: bool, resource: Resource = Resource.create_empty(), ): self.instrumentation_info = instrumentation_info self.metrics = set() self.observers = set() self.batcher = UngroupedBatcher(stateful) self.observers_lock = threading.Lock() self.resource = resource
[docs] def collect(self) -> None: """Collects all the metrics created with this `Meter` for export. Utilizes the batcher to create checkpoints of the current values in each aggregator belonging to the metrics that were created with this meter instance. """ self._collect_metrics() self._collect_observers()
def _collect_metrics(self) -> None: for metric in self.metrics: if not metric.enabled: continue to_remove = [] with metric.bound_instruments_lock: for labels, bound_instr in metric.bound_instruments.items(): # TODO: Consider storing records in memory? record = Record(metric, labels, bound_instr.aggregator) # Checkpoints the current aggregators # Applies different batching logic based on type of batcher self.batcher.process(record) if bound_instr.ref_count() == 0: to_remove.append(labels) # Remove handles that were released for labels in to_remove: del metric.bound_instruments[labels] def _collect_observers(self) -> None: with self.observers_lock: for observer in self.observers: if not observer.enabled: continue if not observer.run(): continue for labels, aggregator in observer.aggregators.items(): record = Record(observer, labels, aggregator) self.batcher.process(record)
[docs] def record_batch( self, labels: Dict[str, str], record_tuples: Sequence[Tuple[metrics_api.Metric, metrics_api.ValueT]], ) -> None: """See `opentelemetry.metrics.Meter.record_batch`.""" # TODO: Avoid enconding the labels for each instrument, encode once # and reuse. for metric, value in record_tuples: metric.UPDATE_FUNCTION(value, labels)
[docs] def create_metric( self, name: str, description: str, unit: str, value_type: Type[metrics_api.ValueT], metric_type: Type[metrics_api.MetricT], label_keys: Sequence[str] = (), enabled: bool = True, ) -> metrics_api.MetricT: """See `opentelemetry.metrics.Meter.create_metric`.""" # Ignore type b/c of mypy bug in addition to missing annotations metric = metric_type( # type: ignore name, description, unit, value_type, self, label_keys=label_keys, enabled=enabled, ) self.metrics.add(metric) return metric
[docs] def register_observer( self, callback: metrics_api.ObserverCallbackT, name: str, description: str, unit: str, value_type: Type[metrics_api.ValueT], label_keys: Sequence[str] = (), enabled: bool = True, ) -> metrics_api.Observer: ob = Observer( callback, name, description, unit, value_type, self, label_keys, enabled, ) with self.observers_lock: self.observers.add(ob) return ob
[docs] def unregister_observer(self, observer: "Observer") -> None: with self.observers_lock: self.observers.remove(observer)
[docs]class MeterProvider(metrics_api.MeterProvider): def __init__(self, resource: Resource = Resource.create_empty()): self.resource = resource
[docs] def get_meter( self, instrumenting_module_name: str, stateful=True, instrumenting_library_version: str = "", ) -> "metrics_api.Meter": if not instrumenting_module_name: # Reject empty strings too. raise ValueError("get_meter called with missing module name.") return Meter( InstrumentationInfo( instrumenting_module_name, instrumenting_library_version ), stateful=stateful, resource=self.resource, )