Source code for opentelemetry.sdk.trace.export

# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import collections
import logging
import os
import sys
import threading
import typing
from enum import Enum

from opentelemetry.context import attach, detach, get_current, set_value
from opentelemetry.trace import DefaultSpan
from opentelemetry.util import time_ns

from .. import Span, SpanProcessor

logger = logging.getLogger(__name__)


[docs]class SpanExportResult(Enum): SUCCESS = 0 FAILURE = 1
[docs]class SpanExporter: """Interface for exporting spans. Interface to be implemented by services that want to export recorded in its own format. To export data this MUST be registered to the :class`opentelemetry.sdk.trace.Tracer` using a `SimpleExportSpanProcessor` or a `BatchExportSpanProcessor`. """
[docs] def export(self, spans: typing.Sequence[Span]) -> "SpanExportResult": """Exports a batch of telemetry data. Args: spans: The list of `opentelemetry.trace.Span` objects to be exported Returns: The result of the export """
[docs] def shutdown(self) -> None: """Shuts down the exporter. Called when the SDK is shut down. """
[docs]class SimpleExportSpanProcessor(SpanProcessor): """Simple SpanProcessor implementation. SimpleExportSpanProcessor is an implementation of `SpanProcessor` that passes ended spans directly to the configured `SpanExporter`. """ def __init__(self, span_exporter: SpanExporter): self.span_exporter = span_exporter
[docs] def on_start(self, span: Span) -> None: pass
[docs] def on_end(self, span: Span) -> None: token = attach(set_value("suppress_instrumentation", True)) try: self.span_exporter.export((span,)) # pylint: disable=broad-except except Exception: logger.exception("Exception while exporting Span.") detach(token)
[docs] def shutdown(self) -> None: self.span_exporter.shutdown()
[docs] def force_flush(self, timeout_millis: int = 30000) -> bool: # pylint: disable=unused-argument return True
[docs]class BatchExportSpanProcessor(SpanProcessor): """Batch span processor implementation. BatchExportSpanProcessor is an implementation of `SpanProcessor` that batches ended spans and pushes them to the configured `SpanExporter`. """ _FLUSH_TOKEN_SPAN = DefaultSpan(context=None) def __init__( self, span_exporter: SpanExporter, max_queue_size: int = 2048, schedule_delay_millis: float = 5000, max_export_batch_size: int = 512, ): if max_queue_size <= 0: raise ValueError("max_queue_size must be a positive integer.") if schedule_delay_millis <= 0: raise ValueError("schedule_delay_millis must be positive.") if max_export_batch_size <= 0: raise ValueError( "max_export_batch_size must be a positive integer." ) if max_export_batch_size > max_queue_size: raise ValueError( "max_export_batch_size must be less than and equal to max_export_batch_size." ) self.span_exporter = span_exporter self.queue = collections.deque( [], max_queue_size ) # type: typing.Deque[Span] self.worker_thread = threading.Thread(target=self.worker, daemon=True) self.condition = threading.Condition(threading.Lock()) self.flush_condition = threading.Condition(threading.Lock()) # flag to indicate that there is a flush operation on progress self._flushing = False self.schedule_delay_millis = schedule_delay_millis self.max_export_batch_size = max_export_batch_size self.max_queue_size = max_queue_size self.done = False # flag that indicates that spans are being dropped self._spans_dropped = False # precallocated list to send spans to exporter self.spans_list = [ None ] * self.max_export_batch_size # type: typing.List[typing.Optional[Span]] self.worker_thread.start()
[docs] def on_start(self, span: Span) -> None: pass
[docs] def on_end(self, span: Span) -> None: if self.done: logger.warning("Already shutdown, dropping span.") return if len(self.queue) == self.max_queue_size: if not self._spans_dropped: logger.warning("Queue is full, likely spans will be dropped.") self._spans_dropped = True self.queue.appendleft(span) if len(self.queue) >= self.max_queue_size // 2: with self.condition: self.condition.notify()
[docs] def worker(self): timeout = self.schedule_delay_millis / 1e3 while not self.done: if ( len(self.queue) < self.max_export_batch_size and not self._flushing ): with self.condition: self.condition.wait(timeout) if not self.queue: # spurious notification, let's wait again continue if self.done: # missing spans will be sent when calling flush break # substract the duration of this export call to the next timeout start = time_ns() self.export() end = time_ns() duration = (end - start) / 1e9 timeout = self.schedule_delay_millis / 1e3 - duration # be sure that all spans are sent self._drain_queue()
[docs] def export(self) -> None: """Exports at most max_export_batch_size spans.""" idx = 0 notify_flush = False # currently only a single thread acts as consumer, so queue.pop() will # not raise an exception while idx < self.max_export_batch_size and self.queue: span = self.queue.pop() if span is self._FLUSH_TOKEN_SPAN: notify_flush = True else: self.spans_list[idx] = span idx += 1 token = attach(set_value("suppress_instrumentation", True)) try: # Ignore type b/c the Optional[None]+slicing is too "clever" # for mypy self.span_exporter.export(self.spans_list[:idx]) # type: ignore # pylint: disable=broad-except except Exception: logger.exception("Exception while exporting Span batch.") detach(token) if notify_flush: with self.flush_condition: self.flush_condition.notify() # clean up list for index in range(idx): self.spans_list[index] = None
def _drain_queue(self): """"Export all elements until queue is empty. Can only be called from the worker thread context because it invokes `export` that is not thread safe. """ while self.queue: self.export()
[docs] def force_flush(self, timeout_millis: int = 30000) -> bool: if self.done: logger.warning("Already shutdown, ignoring call to force_flush().") return True self._flushing = True self.queue.appendleft(self._FLUSH_TOKEN_SPAN) # wake up worker thread with self.condition: self.condition.notify_all() # wait for token to be processed with self.flush_condition: ret = self.flush_condition.wait(timeout_millis / 1e3) self._flushing = False if not ret: logger.warning("Timeout was exceeded in force_flush().") return ret
[docs] def shutdown(self) -> None: # signal the worker thread to finish and then wait for it self.done = True with self.condition: self.condition.notify_all() self.worker_thread.join() self.span_exporter.shutdown()
[docs]class ConsoleSpanExporter(SpanExporter): """Implementation of :class:`SpanExporter` that prints spans to the console. This class can be used for diagnostic purposes. It prints the exported spans to the console STDOUT. """ def __init__( self, out: typing.IO = sys.stdout, formatter: typing.Callable[[Span], str] = lambda span: span.to_json() + os.linesep, ): self.out = out self.formatter = formatter
[docs] def export(self, spans: typing.Sequence[Span]) -> SpanExportResult: for span in spans: self.out.write(self.formatter(span)) self.out.flush() return SpanExportResult.SUCCESS