Source code for opentelemetry.ext.zipkin

# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
This library allows to export tracing data to `Zipkin <https://zipkin.io/>`_.

Usage
-----

The **OpenTelemetry Zipkin Exporter** allows to export `OpenTelemetry`_ traces to `Zipkin`_.
This exporter always send traces to the configured Zipkin collector using HTTP.


.. _Zipkin: https://zipkin.io/
.. _OpenTelemetry: https://github.com/open-telemetry/opentelemetry-python/

.. code:: python

    from opentelemetry import trace
    from opentelemetry.ext import zipkin
    from opentelemetry.sdk.trace import TracerProvider
    from opentelemetry.sdk.trace.export import BatchExportSpanProcessor

    trace.set_tracer_provider(TracerProvider())
    tracer = trace.get_tracer(__name__)

    # create a ZipkinSpanExporter
    zipkin_exporter = zipkin.ZipkinSpanExporter(
        service_name="my-helloworld-service",
        # optional:
        # host_name="localhost",
        # port=9411,
        # endpoint="/api/v2/spans",
        # protocol="http",
        # ipv4="",
        # ipv6="",
        # retry=False,
    )

    # Create a BatchExportSpanProcessor and add the exporter to it
    span_processor = BatchExportSpanProcessor(zipkin_exporter)

    # add to the tracer
    trace.get_tracer_provider().add_span_processor(span_processor)

    with tracer.start_as_current_span("foo"):
        print("Hello world!")

API
---
"""

import json
import logging
from typing import Optional, Sequence

import requests

from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult
from opentelemetry.trace import Span, SpanContext, SpanKind

DEFAULT_ENDPOINT = "/api/v2/spans"
DEFAULT_HOST_NAME = "localhost"
DEFAULT_PORT = 9411
DEFAULT_PROTOCOL = "http"
DEFAULT_RETRY = False
ZIPKIN_HEADERS = {"Content-Type": "application/json"}

SPAN_KIND_MAP = {
    SpanKind.INTERNAL: None,
    SpanKind.SERVER: "SERVER",
    SpanKind.CLIENT: "CLIENT",
    SpanKind.PRODUCER: "PRODUCER",
    SpanKind.CONSUMER: "CONSUMER",
}

SUCCESS_STATUS_CODES = (200, 202)

logger = logging.getLogger(__name__)


[docs]class ZipkinSpanExporter(SpanExporter): """Zipkin span exporter for OpenTelemetry. Args: service_name: Service that logged an annotation in a trace.Classifier when query for spans. host_name: The host name of the Zipkin server port: The port of the Zipkin server endpoint: The endpoint of the Zipkin server protocol: The protocol used for the request. ipv4: Primary IPv4 address associated with this connection. ipv6: Primary IPv6 address associated with this connection. retry: Set to True to configure the exporter to retry on failure. """ def __init__( self, service_name: str, host_name: str = DEFAULT_HOST_NAME, port: int = DEFAULT_PORT, endpoint: str = DEFAULT_ENDPOINT, protocol: str = DEFAULT_PROTOCOL, ipv4: Optional[str] = None, ipv6: Optional[str] = None, retry: Optional[str] = DEFAULT_RETRY, ): self.service_name = service_name self.host_name = host_name self.port = port self.endpoint = endpoint self.protocol = protocol self.url = "{}://{}:{}{}".format( self.protocol, self.host_name, self.port, self.endpoint ) self.ipv4 = ipv4 self.ipv6 = ipv6 self.retry = retry
[docs] def export(self, spans: Sequence[Span]) -> SpanExportResult: zipkin_spans = self._translate_to_zipkin(spans) result = requests.post( url=self.url, data=json.dumps(zipkin_spans), headers=ZIPKIN_HEADERS ) if result.status_code not in SUCCESS_STATUS_CODES: logger.error( "Traces cannot be uploaded; status code: %s, message %s", result.status_code, result.text, ) if self.retry: return SpanExportResult.FAILURE return SpanExportResult.FAILURE return SpanExportResult.SUCCESS
def _translate_to_zipkin(self, spans: Sequence[Span]): local_endpoint = {"serviceName": self.service_name, "port": self.port} if self.ipv4 is not None: local_endpoint["ipv4"] = self.ipv4 if self.ipv6 is not None: local_endpoint["ipv6"] = self.ipv6 zipkin_spans = [] for span in spans: context = span.get_context() trace_id = context.trace_id span_id = context.span_id # Timestamp in zipkin spans is int of microseconds. # see: https://zipkin.io/pages/instrumenting.html start_timestamp_mus = _nsec_to_usec_round(span.start_time) duration_mus = _nsec_to_usec_round(span.end_time - span.start_time) zipkin_span = { "traceId": format(trace_id, "x"), "id": format(span_id, "x"), "name": span.name, "timestamp": start_timestamp_mus, "duration": duration_mus, "localEndpoint": local_endpoint, "kind": SPAN_KIND_MAP[span.kind], "tags": _extract_tags_from_span(span.attributes), "annotations": _extract_annotations_from_events(span.events), } if context.trace_flags.sampled: zipkin_span["debug"] = True if isinstance(span.parent, Span): zipkin_span["parentId"] = format( span.parent.get_context().span_id, "x" ) elif isinstance(span.parent, SpanContext): zipkin_span["parentId"] = format(span.parent.span_id, "x") zipkin_spans.append(zipkin_span) return zipkin_spans
[docs] def shutdown(self) -> None: pass
def _extract_tags_from_span(attr): if not attr: return None tags = {} for attribute_key, attribute_value in attr.items(): if isinstance(attribute_value, (int, bool, float)): value = str(attribute_value) elif isinstance(attribute_value, str): value = attribute_value[:128] else: logger.warning("Could not serialize tag %s", attribute_key) continue tags[attribute_key] = value return tags def _extract_annotations_from_events(events): return ( [ {"timestamp": _nsec_to_usec_round(e.timestamp), "value": e.name} for e in events ] if events else None ) def _nsec_to_usec_round(nsec): """Round nanoseconds to microseconds""" return (nsec + 500) // 10 ** 3