Source code for opentelemetry.ext.jaeger

# Copyright 2018, OpenCensus Authors
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
The **OpenTelemetry Jaeger Exporter** allows to export `OpenTelemetry`_ traces to `Jaeger`_.
This exporter always send traces to the configured agent using Thrift compact protocol over UDP.
An optional collector can be configured, in this case Thrift binary protocol over HTTP is used.
gRPC is still not supported by this implementation.

Usage
-----

.. code:: python

    from opentelemetry import trace
    from opentelemetry.ext import jaeger
    from opentelemetry.sdk.trace import TracerProvider
    from opentelemetry.sdk.trace.export import BatchExportSpanProcessor

    trace.set_tracer_provider(TracerProvider())
    tracer = trace.get_tracer(__name__)

    # create a JaegerSpanExporter
    jaeger_exporter = jaeger.JaegerSpanExporter(
        service_name='my-helloworld-service',
        # configure agent
        agent_host_name='localhost',
        agent_port=6831,
        # optional: configure also collector
        # collector_host_name='localhost',
        # collector_port=14268,
        # collector_endpoint='/api/traces?format=jaeger.thrift',
        # username=xxxx, # optional
        # password=xxxx, # optional
    )

    # Create a BatchExportSpanProcessor and add the exporter to it
    span_processor = BatchExportSpanProcessor(jaeger_exporter)

    # add to the tracer
    trace.get_tracer_provider().add_span_processor(span_processor)

    with tracer.start_as_current_span('foo'):
        print('Hello world!')

API
---
.. _Jaeger: https://www.jaegertracing.io/
.. _OpenTelemetry: https://github.com/open-telemetry/opentelemetry-python/
"""

import base64
import logging
import socket

from thrift.protocol import TBinaryProtocol, TCompactProtocol
from thrift.transport import THttpClient, TTransport

import opentelemetry.trace as trace_api
from opentelemetry.ext.jaeger.gen.agent import Agent as agent
from opentelemetry.ext.jaeger.gen.jaeger import Collector as jaeger
from opentelemetry.sdk.trace.export import Span, SpanExporter, SpanExportResult
from opentelemetry.trace.status import StatusCanonicalCode

DEFAULT_AGENT_HOST_NAME = "localhost"
DEFAULT_AGENT_PORT = 6831
DEFAULT_COLLECTOR_ENDPOINT = "/api/traces?format=jaeger.thrift"

UDP_PACKET_MAX_LENGTH = 65000

logger = logging.getLogger(__name__)


[docs]class JaegerSpanExporter(SpanExporter): """Jaeger span exporter for OpenTelemetry. Args: service_name: Service that logged an annotation in a trace.Classifier when query for spans. agent_host_name: The host name of the Jaeger-Agent. agent_port: The port of the Jaeger-Agent. collector_host_name: The host name of the Jaeger-Collector HTTP Thrift. collector_port: The port of the Jaeger-Collector HTTP Thrift. collector_endpoint: The endpoint of the Jaeger-Collector HTTP Thrift. username: The user name of the Basic Auth if authentication is required. password: The password of the Basic Auth if authentication is required. """ def __init__( self, service_name, agent_host_name=DEFAULT_AGENT_HOST_NAME, agent_port=DEFAULT_AGENT_PORT, collector_host_name=None, collector_port=None, collector_endpoint=DEFAULT_COLLECTOR_ENDPOINT, username=None, password=None, ): self.service_name = service_name self.agent_host_name = agent_host_name self.agent_port = agent_port self._agent_client = None self.collector_host_name = collector_host_name self.collector_port = collector_port self.collector_endpoint = collector_endpoint self.username = username self.password = password self._collector = None @property def agent_client(self): if self._agent_client is None: self._agent_client = AgentClientUDP( host_name=self.agent_host_name, port=self.agent_port ) return self._agent_client @property def collector(self): if self._collector is not None: return self._collector if self.collector_host_name is None or self.collector_port is None: return None thrift_url = "http://{}:{}{}".format( self.collector_host_name, self.collector_port, self.collector_endpoint, ) auth = None if self.username is not None and self.password is not None: auth = (self.username, self.password) self._collector = Collector(thrift_url=thrift_url, auth=auth) return self._collector
[docs] def export(self, spans): jaeger_spans = _translate_to_jaeger(spans) batch = jaeger.Batch( spans=jaeger_spans, process=jaeger.Process(serviceName=self.service_name), ) if self.collector is not None: self.collector.submit(batch) self.agent_client.emit(batch) return SpanExportResult.SUCCESS
[docs] def shutdown(self): pass
def _nsec_to_usec_round(nsec): """Round nanoseconds to microseconds""" return (nsec + 500) // 10 ** 3 def _translate_to_jaeger(spans: Span): """Translate the spans to Jaeger format. Args: spans: Tuple of spans to convert """ jaeger_spans = [] for span in spans: ctx = span.get_context() trace_id = ctx.trace_id span_id = ctx.span_id start_time_us = _nsec_to_usec_round(span.start_time) duration_us = _nsec_to_usec_round(span.end_time - span.start_time) status = span.status parent_id = span.parent.span_id if span.parent else 0 tags = _extract_tags(span.attributes) tags.extend( [ _get_long_tag("status.code", status.canonical_code.value), _get_string_tag("status.message", status.description), _get_string_tag("span.kind", span.kind.name), ] ) # Ensure that if Status.Code is not OK, that we set the "error" tag on the Jaeger span. if status.canonical_code is not StatusCanonicalCode.OK: tags.append(_get_bool_tag("error", True)) refs = _extract_refs_from_span(span) logs = _extract_logs_from_span(span) flags = int(ctx.trace_flags) jaeger_span = jaeger.Span( traceIdHigh=_get_trace_id_high(trace_id), traceIdLow=_get_trace_id_low(trace_id), # generated code expects i64 spanId=_convert_int_to_i64(span_id), operationName=span.name, startTime=start_time_us, duration=duration_us, tags=tags, logs=logs, references=refs, flags=flags, parentSpanId=_convert_int_to_i64(parent_id), ) jaeger_spans.append(jaeger_span) return jaeger_spans def _extract_refs_from_span(span): if not span.links: return None refs = [] for link in span.links: trace_id = link.context.trace_id span_id = link.context.span_id refs.append( jaeger.SpanRef( refType=jaeger.SpanRefType.FOLLOWS_FROM, traceIdHigh=_get_trace_id_high(trace_id), traceIdLow=_get_trace_id_low(trace_id), spanId=_convert_int_to_i64(span_id), ) ) return refs def _convert_int_to_i64(val): """Convert integer to signed int64 (i64)""" if val > 0x7FFFFFFFFFFFFFFF: val -= 0x10000000000000000 return val def _get_trace_id_low(trace_id): return _convert_int_to_i64(trace_id & 0xFFFFFFFFFFFFFFFF) def _get_trace_id_high(trace_id): return _convert_int_to_i64((trace_id >> 64) & 0xFFFFFFFFFFFFFFFF) def _extract_logs_from_span(span): if not span.events: return None logs = [] for event in span.events: fields = _extract_tags(event.attributes) fields.append( jaeger.Tag( key="message", vType=jaeger.TagType.STRING, vStr=event.name ) ) event_timestamp_us = _nsec_to_usec_round(event.timestamp) logs.append( jaeger.Log(timestamp=int(event_timestamp_us), fields=fields) ) return logs def _extract_tags(attr): if not attr: return [] tags = [] for attribute_key, attribute_value in attr.items(): tag = _convert_attribute_to_tag(attribute_key, attribute_value) if tag is None: continue tags.append(tag) return tags def _convert_attribute_to_tag(key, attr): """Convert the attributes to jaeger tags.""" if isinstance(attr, bool): return jaeger.Tag(key=key, vBool=attr, vType=jaeger.TagType.BOOL) if isinstance(attr, str): return jaeger.Tag(key=key, vStr=attr, vType=jaeger.TagType.STRING) if isinstance(attr, int): return jaeger.Tag(key=key, vLong=attr, vType=jaeger.TagType.LONG) if isinstance(attr, float): return jaeger.Tag(key=key, vDouble=attr, vType=jaeger.TagType.DOUBLE) logger.warning("Could not serialize attribute %s:%r to tag", key, attr) return None def _get_long_tag(key, val): return jaeger.Tag(key=key, vLong=val, vType=jaeger.TagType.LONG) def _get_string_tag(key, val): return jaeger.Tag(key=key, vStr=val, vType=jaeger.TagType.STRING) def _get_bool_tag(key, val): return jaeger.Tag(key=key, vBool=val, vType=jaeger.TagType.BOOL)
[docs]class AgentClientUDP: """Implement a UDP client to agent. Args: host_name: The host name of the Jaeger server. port: The port of the Jaeger server. max_packet_size: Maximum size of UDP packet. client: Class for creating new client objects for agencies. """ def __init__( self, host_name, port, max_packet_size=UDP_PACKET_MAX_LENGTH, client=agent.Client, ): self.address = (host_name, port) self.max_packet_size = max_packet_size self.buffer = TTransport.TMemoryBuffer() self.client = client( iprot=TCompactProtocol.TCompactProtocol(trans=self.buffer) )
[docs] def emit(self, batch: jaeger.Batch): """ Args: batch: Object to emit Jaeger spans. """ # pylint: disable=protected-access self.client._seqid = 0 # truncate and reset the position of BytesIO object self.buffer._buffer.truncate(0) self.buffer._buffer.seek(0) self.client.emitBatch(batch) buff = self.buffer.getvalue() if len(buff) > self.max_packet_size: logger.warning( "Data exceeds the max UDP packet size; size %r, max %r", len(buff), self.max_packet_size, ) return with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as udp_socket: udp_socket.sendto(buff, self.address)
[docs]class Collector: """Submits collected spans to Thrift HTTP server. Args: thrift_url: URL of the Jaeger HTTP Thrift. auth: Auth tuple that contains username and password for Basic Auth. """ def __init__(self, thrift_url="", auth=None): self.thrift_url = thrift_url self.auth = auth self.http_transport = THttpClient.THttpClient( uri_or_host=self.thrift_url ) self.protocol = TBinaryProtocol.TBinaryProtocol(self.http_transport) # set basic auth header if auth is not None: auth_header = "{}:{}".format(*auth) decoded = base64.b64encode(auth_header.encode()).decode("ascii") basic_auth = dict(Authorization="Basic {}".format(decoded)) self.http_transport.setCustomHeaders(basic_auth)
[docs] def submit(self, batch: jaeger.Batch): """Submits batches to Thrift HTTP Server through Binary Protocol. Args: batch: Object to emit Jaeger spans. """ batch.write(self.protocol) self.http_transport.flush() code = self.http_transport.code msg = self.http_transport.message if code >= 300 or code < 200: logger.error( "Traces cannot be uploaded; HTTP status code: %s, message: %s", code, msg, )