Source code for opentelemetry.sdk.trace

# Copyright 2019, OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import atexit
import logging
import random
import threading
from contextlib import contextmanager
from numbers import Number
from types import TracebackType
from typing import Iterator, Optional, Sequence, Tuple, Type

from opentelemetry import context as context_api
from opentelemetry import trace as trace_api
from opentelemetry.sdk import util
from opentelemetry.sdk.util import BoundedDict, BoundedList
from opentelemetry.trace import SpanContext, sampling
from opentelemetry.trace.propagation import SPAN_KEY
from opentelemetry.trace.status import Status, StatusCanonicalCode
from opentelemetry.util import time_ns, types

logger = logging.getLogger(__name__)

MAX_NUM_ATTRIBUTES = 32
MAX_NUM_EVENTS = 128
MAX_NUM_LINKS = 32


[docs]class SpanProcessor: """Interface which allows hooks for SDK's `Span` start and end method invocations. Span processors can be registered directly using :func:`TracerSource.add_span_processor` and they are invoked in the same order as they were registered. """
[docs] def on_start(self, span: "Span") -> None: """Called when a :class:`opentelemetry.trace.Span` is started. This method is called synchronously on the thread that starts the span, therefore it should not block or throw an exception. Args: span: The :class:`opentelemetry.trace.Span` that just started. """
[docs] def on_end(self, span: "Span") -> None: """Called when a :class:`opentelemetry.trace.Span` is ended. This method is called synchronously on the thread that ends the span, therefore it should not block or throw an exception. Args: span: The :class:`opentelemetry.trace.Span` that just ended. """
[docs] def shutdown(self) -> None: """Called when a :class:`opentelemetry.sdk.trace.Tracer` is shutdown. """
[docs] def force_flush(self, timeout_millis: int = 30000) -> bool: """Export all ended spans to the configured Exporter that have not yet been exported. Args: timeout_millis: The maximum amount of time to wait for spans to be exported. Returns: False if the timeout is exceeded, True otherwise. """
[docs]class MultiSpanProcessor(SpanProcessor): """Implementation of :class:`SpanProcessor` that forwards all received events to a list of `SpanProcessor`. """ def __init__(self): # use a tuple to avoid race conditions when adding a new span and # iterating through it on "on_start" and "on_end". self._span_processors = () # type: Tuple[SpanProcessor, ...] self._lock = threading.Lock()
[docs] def add_span_processor(self, span_processor: SpanProcessor) -> None: """Adds a SpanProcessor to the list handled by this instance.""" with self._lock: self._span_processors = self._span_processors + (span_processor,)
[docs] def on_start(self, span: "Span") -> None: for sp in self._span_processors: sp.on_start(span)
[docs] def on_end(self, span: "Span") -> None: for sp in self._span_processors: sp.on_end(span)
[docs] def shutdown(self) -> None: for sp in self._span_processors: sp.shutdown()
[docs]class Span(trace_api.Span): """See `opentelemetry.trace.Span`. Users should create `Span` objects via the `Tracer` instead of this constructor. Args: name: The name of the operation this span represents context: The immutable span context parent: This span's parent, may be a `SpanContext` if the parent is remote, null if this is a root span sampler: The sampler used to create this span trace_config: TODO resource: TODO attributes: The span's attributes to be exported events: Timestamped events to be exported links: Links to other spans to be exported span_processor: `SpanProcessor` to invoke when starting and ending this `Span`. """ # Initialize these lazily assuming most spans won't have them. empty_attributes = BoundedDict(MAX_NUM_ATTRIBUTES) empty_events = BoundedList(MAX_NUM_EVENTS) empty_links = BoundedList(MAX_NUM_LINKS) def __init__( self, name: str, context: trace_api.SpanContext, parent: trace_api.ParentSpan = None, sampler: Optional[sampling.Sampler] = None, trace_config: None = None, # TODO resource: None = None, # TODO attributes: types.Attributes = None, # TODO events: Sequence[trace_api.Event] = None, # TODO links: Sequence[trace_api.Link] = (), kind: trace_api.SpanKind = trace_api.SpanKind.INTERNAL, span_processor: SpanProcessor = SpanProcessor(), instrumentation_info: "InstrumentationInfo" = None, set_status_on_exception: bool = True, ) -> None: self.name = name self.context = context self.parent = parent self.sampler = sampler self.trace_config = trace_config self.resource = resource self.kind = kind self._set_status_on_exception = set_status_on_exception self.span_processor = span_processor self.status = None self._lock = threading.Lock() if attributes is None: self.attributes = Span.empty_attributes else: self.attributes = BoundedDict.from_map( MAX_NUM_ATTRIBUTES, attributes ) if events is None: self.events = Span.empty_events else: self.events = BoundedList.from_seq(MAX_NUM_EVENTS, events) if links is None: self.links = Span.empty_links else: self.links = BoundedList.from_seq(MAX_NUM_LINKS, links) self._end_time = None # type: Optional[int] self._start_time = None # type: Optional[int] self.instrumentation_info = instrumentation_info @property def start_time(self): return self._start_time @property def end_time(self): return self._end_time def __repr__(self): return '{}(name="{}", context={})'.format( type(self).__name__, self.name, self.context ) def __str__(self): return ( '{}(name="{}", context={}, kind={}, ' "parent={}, start_time={}, end_time={})" ).format( type(self).__name__, self.name, self.context, self.kind, repr(self.parent), util.ns_to_iso_str(self.start_time) if self.start_time else "None", util.ns_to_iso_str(self.end_time) if self.end_time else "None", )
[docs] def get_context(self): return self.context
[docs] def set_attribute(self, key: str, value: types.AttributeValue) -> None: with self._lock: if not self.is_recording_events(): return has_ended = self.end_time is not None if not has_ended: if self.attributes is Span.empty_attributes: self.attributes = BoundedDict(MAX_NUM_ATTRIBUTES) if has_ended: logger.warning("Setting attribute on ended span.") return if isinstance(value, Sequence): error_message = self._check_attribute_value_sequence(value) if error_message is not None: logger.warning("%s in attribute value sequence", error_message) return elif not isinstance(value, (bool, str, Number, Sequence)): logger.warning("invalid type for attribute value") return self.attributes[key] = value
@staticmethod def _check_attribute_value_sequence(sequence: Sequence) -> Optional[str]: """ Checks if sequence items are valid and are of the same type """ if len(sequence) == 0: return None first_element_type = type(sequence[0]) if issubclass(first_element_type, Number): first_element_type = Number if first_element_type not in (bool, str, Number): return "invalid type" for element in sequence: if not isinstance(element, first_element_type): return "different type" return None
[docs] def add_event( self, name: str, attributes: types.Attributes = None, timestamp: Optional[int] = None, ) -> None: self.add_lazy_event( trace_api.Event( name, Span.empty_attributes if attributes is None else attributes, time_ns() if timestamp is None else timestamp, ) )
[docs] def add_lazy_event(self, event: trace_api.Event) -> None: with self._lock: if not self.is_recording_events(): return has_ended = self.end_time is not None if not has_ended: if self.events is Span.empty_events: self.events = BoundedList(MAX_NUM_EVENTS) if has_ended: logger.warning("Calling add_event() on an ended span.") return self.events.append(event)
[docs] def start(self, start_time: Optional[int] = None) -> None: with self._lock: if not self.is_recording_events(): return has_started = self.start_time is not None if not has_started: self._start_time = ( start_time if start_time is not None else time_ns() ) if has_started: logger.warning("Calling start() on a started span.") return self.span_processor.on_start(self)
[docs] def end(self, end_time: Optional[int] = None) -> None: with self._lock: if not self.is_recording_events(): return if self.start_time is None: raise RuntimeError("Calling end() on a not started span.") has_ended = self.end_time is not None if not has_ended: if self.status is None: self.status = Status(canonical_code=StatusCanonicalCode.OK) self._end_time = ( end_time if end_time is not None else time_ns() ) if has_ended: logger.warning("Calling end() on an ended span.") return self.span_processor.on_end(self)
[docs] def update_name(self, name: str) -> None: with self._lock: has_ended = self.end_time is not None if has_ended: logger.warning("Calling update_name() on an ended span.") return self.name = name
[docs] def is_recording_events(self) -> bool: return True
[docs] def set_status(self, status: trace_api.Status) -> None: with self._lock: has_ended = self.end_time is not None if has_ended: logger.warning("Calling set_status() on an ended span.") return self.status = status
def __exit__( self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType], ) -> None: """Ends context manager and calls `end` on the `Span`.""" if ( self.status is None and self._set_status_on_exception and exc_val is not None ): self.set_status( Status( canonical_code=StatusCanonicalCode.UNKNOWN, description="{}: {}".format(exc_type.__name__, exc_val), ) ) super().__exit__(exc_type, exc_val, exc_tb)
[docs]def generate_span_id() -> int: """Get a new random span ID. Returns: A random 64-bit int for use as a span ID """ return random.getrandbits(64)
[docs]def generate_trace_id() -> int: """Get a new random trace ID. Returns: A random 128-bit int for use as a trace ID """ return random.getrandbits(128)
[docs]class InstrumentationInfo: """Immutable information about an instrumentation library module. See `TracerSource.get_tracer` for the meaning of the properties. """ __slots__ = ("_name", "_version") def __init__(self, name: str, version: str): self._name = name self._version = version def __repr__(self): return "{}({}, {})".format( type(self).__name__, self._name, self._version ) def __hash__(self): return hash((self._name, self._version)) def __eq__(self, value): return type(value) is type(self) and (self._name, self._version) == ( value._name, value._version, ) def __lt__(self, value): if type(value) is not type(self): return NotImplemented return (self._name, self._version) < (value._name, value._version) @property def version(self) -> str: return self._version @property def name(self) -> str: return self._name
[docs]class Tracer(trace_api.Tracer): """See `opentelemetry.trace.Tracer`. Args: name: The name of the tracer. shutdown_on_exit: Register an atexit hook to shut down the tracer when the application exits. """ def __init__( self, source: "TracerSource", instrumentation_info: InstrumentationInfo ) -> None: self.source = source self.instrumentation_info = instrumentation_info
[docs] def get_current_span(self): """See `opentelemetry.trace.Tracer.get_current_span`.""" return self.source.get_current_span()
[docs] def start_as_current_span( self, name: str, parent: trace_api.ParentSpan = trace_api.Tracer.CURRENT_SPAN, kind: trace_api.SpanKind = trace_api.SpanKind.INTERNAL, attributes: Optional[types.Attributes] = None, links: Sequence[trace_api.Link] = (), ) -> Iterator[trace_api.Span]: """See `opentelemetry.trace.Tracer.start_as_current_span`.""" span = self.start_span(name, parent, kind, attributes, links) return self.use_span(span, end_on_exit=True)
[docs] def start_span( # pylint: disable=too-many-locals self, name: str, parent: trace_api.ParentSpan = trace_api.Tracer.CURRENT_SPAN, kind: trace_api.SpanKind = trace_api.SpanKind.INTERNAL, attributes: Optional[types.Attributes] = None, links: Sequence[trace_api.Link] = (), start_time: Optional[int] = None, set_status_on_exception: bool = True, ) -> trace_api.Span: """See `opentelemetry.trace.Tracer.start_span`.""" if parent is Tracer.CURRENT_SPAN: parent = self.get_current_span() parent_context = parent if isinstance(parent_context, trace_api.Span): parent_context = parent.get_context() if parent_context is not None and not isinstance( parent_context, trace_api.SpanContext ): raise TypeError if parent_context is None or not parent_context.is_valid(): parent = parent_context = None trace_id = generate_trace_id() trace_options = None trace_state = None else: trace_id = parent_context.trace_id trace_options = parent_context.trace_options trace_state = parent_context.trace_state context = trace_api.SpanContext( trace_id, generate_span_id(), trace_options, trace_state ) # The sampler decides whether to create a real or no-op span at the # time of span creation. No-op spans do not record events, and are not # exported. # The sampler may also add attributes to the newly-created span, e.g. # to include information about the sampling decision. sampling_decision = self.source.sampler.should_sample( parent_context, context.trace_id, context.span_id, name, attributes, links, ) if sampling_decision.sampled: options = context.trace_options | trace_api.TraceOptions.SAMPLED context.trace_options = trace_api.TraceOptions(options) if attributes is None: span_attributes = sampling_decision.attributes else: # apply sampling decision attributes after initial attributes span_attributes = attributes.copy() span_attributes.update(sampling_decision.attributes) span = Span( name=name, context=context, parent=parent, sampler=self.source.sampler, attributes=span_attributes, span_processor=self.source._active_span_processor, # pylint:disable=protected-access kind=kind, links=links, instrumentation_info=self.instrumentation_info, set_status_on_exception=set_status_on_exception, ) span.start(start_time=start_time) else: span = trace_api.DefaultSpan(context=context) return span
[docs] @contextmanager def use_span( self, span: trace_api.Span, end_on_exit: bool = False ) -> Iterator[trace_api.Span]: """See `opentelemetry.trace.Tracer.use_span`.""" try: context_snapshot = context_api.get_current() context_api.set_current(context_api.set_value(SPAN_KEY, span)) try: yield span finally: context_api.set_current(context_snapshot) except Exception as error: # pylint: disable=broad-except if ( span.status is None and span._set_status_on_exception # pylint:disable=protected-access # noqa ): span.set_status( Status( canonical_code=StatusCanonicalCode.UNKNOWN, description="{}: {}".format( type(error).__name__, error ), ) ) raise finally: if end_on_exit: span.end()
[docs]class TracerSource(trace_api.TracerSource): def __init__( self, sampler: sampling.Sampler = trace_api.sampling.ALWAYS_ON, shutdown_on_exit: bool = True, ): self._active_span_processor = MultiSpanProcessor() self.sampler = sampler self._atexit_handler = None if shutdown_on_exit: self._atexit_handler = atexit.register(self.shutdown)
[docs] def get_tracer( self, instrumenting_module_name: str, instrumenting_library_version: str = "", ) -> "trace_api.Tracer": if not instrumenting_module_name: # Reject empty strings too. instrumenting_module_name = "ERROR:MISSING MODULE NAME" logger.error("get_tracer called with missing module name.") return Tracer( self, InstrumentationInfo( instrumenting_module_name, instrumenting_library_version ), )
[docs] @staticmethod def get_current_span() -> Span: return context_api.get_value(SPAN_KEY) # type: ignore
[docs] def add_span_processor(self, span_processor: SpanProcessor) -> None: """Registers a new :class:`SpanProcessor` for this `TracerSource`. The span processors are invoked in the same order they are registered. """ # no lock here because MultiSpanProcessor.add_span_processor is # thread safe self._active_span_processor.add_span_processor(span_processor)
[docs] def shutdown(self): """Shut down the span processors added to the tracer.""" self._active_span_processor.shutdown() if self._atexit_handler is not None: atexit.unregister(self._atexit_handler) self._atexit_handler = None