# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
import threading
from collections import namedtuple
from opentelemetry.util import time_ns
[docs]class Aggregator(abc.ABC):
"""Base class for aggregators.
Aggregators are responsible for holding aggregated values and taking a
snapshot of these values upon export (checkpoint).
"""
def __init__(self):
self.current = None
self.checkpoint = None
[docs] @abc.abstractmethod
def update(self, value):
"""Updates the current with the new value."""
[docs] @abc.abstractmethod
def take_checkpoint(self):
"""Stores a snapshot of the current value."""
[docs] @abc.abstractmethod
def merge(self, other):
"""Combines two aggregator values."""
[docs]class CounterAggregator(Aggregator):
"""Aggregator for Counter metrics."""
def __init__(self):
super().__init__()
self.current = 0
self.checkpoint = 0
self._lock = threading.Lock()
self.last_update_timestamp = None
[docs] def update(self, value):
with self._lock:
self.current += value
self.last_update_timestamp = time_ns()
[docs] def take_checkpoint(self):
with self._lock:
self.checkpoint = self.current
self.current = 0
[docs] def merge(self, other):
with self._lock:
self.checkpoint += other.checkpoint
self.last_update_timestamp = get_latest_timestamp(
self.last_update_timestamp, other.last_update_timestamp
)
[docs]class MinMaxSumCountAggregator(Aggregator):
"""Agregator for Measure metrics that keeps min, max, sum and count."""
_TYPE = namedtuple("minmaxsumcount", "min max sum count")
_EMPTY = _TYPE(None, None, None, 0)
@classmethod
def _merge_checkpoint(cls, val1, val2):
if val1 is cls._EMPTY:
return val2
if val2 is cls._EMPTY:
return val1
return cls._TYPE(
min(val1.min, val2.min),
max(val1.max, val2.max),
val1.sum + val2.sum,
val1.count + val2.count,
)
def __init__(self):
super().__init__()
self.current = self._EMPTY
self.checkpoint = self._EMPTY
self._lock = threading.Lock()
self.last_update_timestamp = None
[docs] def update(self, value):
with self._lock:
if self.current is self._EMPTY:
self.current = self._TYPE(value, value, value, 1)
else:
self.current = self._TYPE(
min(self.current.min, value),
max(self.current.max, value),
self.current.sum + value,
self.current.count + 1,
)
self.last_update_timestamp = time_ns()
[docs] def take_checkpoint(self):
with self._lock:
self.checkpoint = self.current
self.current = self._EMPTY
[docs] def merge(self, other):
with self._lock:
self.checkpoint = self._merge_checkpoint(
self.checkpoint, other.checkpoint
)
self.last_update_timestamp = get_latest_timestamp(
self.last_update_timestamp, other.last_update_timestamp
)
[docs]class ObserverAggregator(Aggregator):
"""Same as MinMaxSumCount but also with last value."""
_TYPE = namedtuple("minmaxsumcountlast", "min max sum count last")
def __init__(self):
super().__init__()
self.mmsc = MinMaxSumCountAggregator()
self.current = None
self.checkpoint = self._TYPE(None, None, None, 0, None)
self.last_update_timestamp = None
[docs] def update(self, value):
self.mmsc.update(value)
self.current = value
self.last_update_timestamp = time_ns()
[docs] def take_checkpoint(self):
self.mmsc.take_checkpoint()
self.checkpoint = self._TYPE(*(self.mmsc.checkpoint + (self.current,)))
[docs] def merge(self, other):
self.mmsc.merge(other.mmsc)
last = self.checkpoint.last
self.last_update_timestamp = get_latest_timestamp(
self.last_update_timestamp, other.last_update_timestamp
)
if self.last_update_timestamp == other.last_update_timestamp:
last = other.checkpoint.last
self.checkpoint = self._TYPE(*(self.mmsc.checkpoint + (last,)))
[docs]def get_latest_timestamp(time_stamp, other_timestamp):
if time_stamp is None:
return other_timestamp
if other_timestamp is not None:
if time_stamp < other_timestamp:
return other_timestamp
return time_stamp