# Copyright 2018, OpenCensus Authors
# Copyright 2019, OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Jaeger Span Exporter for OpenTelemetry."""
import base64
import logging
import socket
from thrift.protocol import TBinaryProtocol, TCompactProtocol
from thrift.transport import THttpClient, TTransport
import opentelemetry.trace as trace_api
from opentelemetry.ext.jaeger.gen.agent import Agent as agent
from opentelemetry.ext.jaeger.gen.jaeger import Collector as jaeger
from opentelemetry.sdk.trace.export import Span, SpanExporter, SpanExportResult
from opentelemetry.trace.status import StatusCanonicalCode
DEFAULT_AGENT_HOST_NAME = "localhost"
DEFAULT_AGENT_PORT = 6831
DEFAULT_COLLECTOR_ENDPOINT = "/api/traces?format=jaeger.thrift"
UDP_PACKET_MAX_LENGTH = 65000
logger = logging.getLogger(__name__)
[docs]class JaegerSpanExporter(SpanExporter):
"""Jaeger span exporter for OpenTelemetry.
Args:
service_name: Service that logged an annotation in a trace.Classifier
when query for spans.
agent_host_name: The host name of the Jaeger-Agent.
agent_port: The port of the Jaeger-Agent.
collector_host_name: The host name of the Jaeger-Collector HTTP
Thrift.
collector_port: The port of the Jaeger-Collector HTTP Thrift.
collector_endpoint: The endpoint of the Jaeger-Collector HTTP Thrift.
username: The user name of the Basic Auth if authentication is
required.
password: The password of the Basic Auth if authentication is
required.
"""
def __init__(
self,
service_name,
agent_host_name=DEFAULT_AGENT_HOST_NAME,
agent_port=DEFAULT_AGENT_PORT,
collector_host_name=None,
collector_port=None,
collector_endpoint=DEFAULT_COLLECTOR_ENDPOINT,
username=None,
password=None,
):
self.service_name = service_name
self.agent_host_name = agent_host_name
self.agent_port = agent_port
self._agent_client = None
self.collector_host_name = collector_host_name
self.collector_port = collector_port
self.collector_endpoint = collector_endpoint
self.username = username
self.password = password
self._collector = None
@property
def agent_client(self):
if self._agent_client is None:
self._agent_client = AgentClientUDP(
host_name=self.agent_host_name, port=self.agent_port
)
return self._agent_client
@property
def collector(self):
if self._collector is not None:
return self._collector
if self.collector_host_name is None or self.collector_port is None:
return None
thrift_url = "http://{}:{}{}".format(
self.collector_host_name,
self.collector_port,
self.collector_endpoint,
)
auth = None
if self.username is not None and self.password is not None:
auth = (self.username, self.password)
self._collector = Collector(thrift_url=thrift_url, auth=auth)
return self._collector
[docs] def export(self, spans):
jaeger_spans = _translate_to_jaeger(spans)
batch = jaeger.Batch(
spans=jaeger_spans,
process=jaeger.Process(serviceName=self.service_name),
)
if self.collector is not None:
self.collector.submit(batch)
self.agent_client.emit(batch)
return SpanExportResult.SUCCESS
[docs] def shutdown(self):
pass
def _nsec_to_usec_round(nsec):
"""Round nanoseconds to microseconds"""
return (nsec + 500) // 10 ** 3
def _translate_to_jaeger(spans: Span):
"""Translate the spans to Jaeger format.
Args:
spans: Tuple of spans to convert
"""
jaeger_spans = []
for span in spans:
ctx = span.get_context()
trace_id = ctx.trace_id
span_id = ctx.span_id
start_time_us = _nsec_to_usec_round(span.start_time)
duration_us = _nsec_to_usec_round(span.end_time - span.start_time)
status = span.status
parent_id = 0
if isinstance(span.parent, trace_api.Span):
parent_id = span.parent.get_context().span_id
elif isinstance(span.parent, trace_api.SpanContext):
parent_id = span.parent.span_id
tags = _extract_tags(span.attributes)
tags.extend(
[
_get_long_tag("status.code", status.canonical_code.value),
_get_string_tag("status.message", status.description),
_get_string_tag("span.kind", span.kind.name),
]
)
# Ensure that if Status.Code is not OK, that we set the "error" tag on the Jaeger span.
if status.canonical_code is not StatusCanonicalCode.OK:
tags.append(_get_bool_tag("error", True))
refs = _extract_refs_from_span(span)
logs = _extract_logs_from_span(span)
flags = int(ctx.trace_options)
jaeger_span = jaeger.Span(
traceIdHigh=_get_trace_id_high(trace_id),
traceIdLow=_get_trace_id_low(trace_id),
# generated code expects i64
spanId=_convert_int_to_i64(span_id),
operationName=span.name,
startTime=start_time_us,
duration=duration_us,
tags=tags,
logs=logs,
references=refs,
flags=flags,
parentSpanId=_convert_int_to_i64(parent_id),
)
jaeger_spans.append(jaeger_span)
return jaeger_spans
def _extract_refs_from_span(span):
if not span.links:
return None
refs = []
for link in span.links:
trace_id = link.context.trace_id
span_id = link.context.span_id
refs.append(
jaeger.SpanRef(
refType=jaeger.SpanRefType.FOLLOWS_FROM,
traceIdHigh=_get_trace_id_high(trace_id),
traceIdLow=_get_trace_id_low(trace_id),
spanId=_convert_int_to_i64(span_id),
)
)
return refs
def _convert_int_to_i64(val):
"""Convert integer to signed int64 (i64)"""
if val > 0x7FFFFFFFFFFFFFFF:
val -= 0x10000000000000000
return val
def _get_trace_id_low(trace_id):
return _convert_int_to_i64(trace_id & 0xFFFFFFFFFFFFFFFF)
def _get_trace_id_high(trace_id):
return _convert_int_to_i64((trace_id >> 64) & 0xFFFFFFFFFFFFFFFF)
def _extract_logs_from_span(span):
if not span.events:
return None
logs = []
for event in span.events:
fields = _extract_tags(event.attributes)
fields.append(
jaeger.Tag(
key="message", vType=jaeger.TagType.STRING, vStr=event.name
)
)
event_timestamp_us = _nsec_to_usec_round(event.timestamp)
logs.append(
jaeger.Log(timestamp=int(event_timestamp_us), fields=fields)
)
return logs
def _extract_tags(attr):
if not attr:
return []
tags = []
for attribute_key, attribute_value in attr.items():
tag = _convert_attribute_to_tag(attribute_key, attribute_value)
if tag is None:
continue
tags.append(tag)
return tags
def _convert_attribute_to_tag(key, attr):
"""Convert the attributes to jaeger tags."""
if isinstance(attr, bool):
return jaeger.Tag(key=key, vBool=attr, vType=jaeger.TagType.BOOL)
if isinstance(attr, str):
return jaeger.Tag(key=key, vStr=attr, vType=jaeger.TagType.STRING)
if isinstance(attr, int):
return jaeger.Tag(key=key, vLong=attr, vType=jaeger.TagType.LONG)
if isinstance(attr, float):
return jaeger.Tag(key=key, vDouble=attr, vType=jaeger.TagType.DOUBLE)
logger.warning("Could not serialize attribute %s:%r to tag", key, attr)
return None
def _get_long_tag(key, val):
return jaeger.Tag(key=key, vLong=val, vType=jaeger.TagType.LONG)
def _get_string_tag(key, val):
return jaeger.Tag(key=key, vStr=val, vType=jaeger.TagType.STRING)
def _get_bool_tag(key, val):
return jaeger.Tag(key=key, vBool=val, vType=jaeger.TagType.BOOL)
[docs]class AgentClientUDP:
"""Implement a UDP client to agent.
Args:
host_name: The host name of the Jaeger server.
port: The port of the Jaeger server.
max_packet_size: Maximum size of UDP packet.
client: Class for creating new client objects for agencies.
"""
def __init__(
self,
host_name,
port,
max_packet_size=UDP_PACKET_MAX_LENGTH,
client=agent.Client,
):
self.address = (host_name, port)
self.max_packet_size = max_packet_size
self.buffer = TTransport.TMemoryBuffer()
self.client = client(
iprot=TCompactProtocol.TCompactProtocol(trans=self.buffer)
)
[docs] def emit(self, batch: jaeger.Batch):
"""
Args:
batch: Object to emit Jaeger spans.
"""
# pylint: disable=protected-access
self.client._seqid = 0
# truncate and reset the position of BytesIO object
self.buffer._buffer.truncate(0)
self.buffer._buffer.seek(0)
self.client.emitBatch(batch)
buff = self.buffer.getvalue()
if len(buff) > self.max_packet_size:
logger.warning(
"Data exceeds the max UDP packet size; size %r, max %r",
len(buff),
self.max_packet_size,
)
return
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as udp_socket:
udp_socket.sendto(buff, self.address)
[docs]class Collector:
"""Submits collected spans to Thrift HTTP server.
Args:
thrift_url: URL of the Jaeger HTTP Thrift.
auth: Auth tuple that contains username and password for Basic Auth.
client: Class for creating a Jaeger collector client.
http_transport: Class for creating new client for Thrift HTTP server.
"""
def __init__(
self,
thrift_url="",
auth=None,
client=jaeger.Client,
http_transport=THttpClient.THttpClient,
):
self.thrift_url = thrift_url
self.auth = auth
self.http_transport = http_transport(uri_or_host=thrift_url)
self.client = client(
iprot=TBinaryProtocol.TBinaryProtocol(trans=self.http_transport)
)
# set basic auth header
if auth is not None:
auth_header = "{}:{}".format(*auth)
decoded = base64.b64encode(auth_header.encode()).decode("ascii")
basic_auth = dict(Authorization="Basic {}".format(decoded))
self.http_transport.setCustomHeaders(basic_auth)
[docs] def submit(self, batch: jaeger.Batch):
"""Submits batches to Thrift HTTP Server through Binary Protocol.
Args:
batch: Object to emit Jaeger spans.
"""
try:
self.client.submitBatches([batch])
# it will call http_transport.flush() and
# status code and message will be updated
code = self.http_transport.code
msg = self.http_transport.message
if code >= 300 or code < 200:
logger.error(
"Traces cannot be uploaded; HTTP status code: %s, message %s",
code,
msg,
)
finally:
if self.http_transport.isOpen():
self.http_transport.close()