# Copyright 2019, OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Zipkin Span Exporter for OpenTelemetry."""
import json
import logging
from typing import Optional, Sequence
import requests
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult
from opentelemetry.trace import Span, SpanContext, SpanKind
DEFAULT_ENDPOINT = "/api/v2/spans"
DEFAULT_HOST_NAME = "localhost"
DEFAULT_PORT = 9411
DEFAULT_PROTOCOL = "http"
DEFAULT_RETRY = False
ZIPKIN_HEADERS = {"Content-Type": "application/json"}
SPAN_KIND_MAP = {
SpanKind.INTERNAL: None,
SpanKind.SERVER: "SERVER",
SpanKind.CLIENT: "CLIENT",
SpanKind.PRODUCER: "PRODUCER",
SpanKind.CONSUMER: "CONSUMER",
}
SUCCESS_STATUS_CODES = (200, 202)
logger = logging.getLogger(__name__)
[docs]class ZipkinSpanExporter(SpanExporter):
"""Zipkin span exporter for OpenTelemetry.
Args:
service_name: Service that logged an annotation in a trace.Classifier
when query for spans.
host_name: The host name of the Zipkin server
port: The port of the Zipkin server
endpoint: The endpoint of the Zipkin server
protocol: The protocol used for the request.
ipv4: Primary IPv4 address associated with this connection.
ipv6: Primary IPv6 address associated with this connection.
retry: Set to True to configure the exporter to retry on failure.
"""
def __init__(
self,
service_name: str,
host_name: str = DEFAULT_HOST_NAME,
port: int = DEFAULT_PORT,
endpoint: str = DEFAULT_ENDPOINT,
protocol: str = DEFAULT_PROTOCOL,
ipv4: Optional[str] = None,
ipv6: Optional[str] = None,
retry: Optional[str] = DEFAULT_RETRY,
):
self.service_name = service_name
self.host_name = host_name
self.port = port
self.endpoint = endpoint
self.protocol = protocol
self.url = "{}://{}:{}{}".format(
self.protocol, self.host_name, self.port, self.endpoint
)
self.ipv4 = ipv4
self.ipv6 = ipv6
self.retry = retry
[docs] def export(self, spans: Sequence[Span]) -> SpanExportResult:
zipkin_spans = self._translate_to_zipkin(spans)
result = requests.post(
url=self.url, data=json.dumps(zipkin_spans), headers=ZIPKIN_HEADERS
)
if result.status_code not in SUCCESS_STATUS_CODES:
logger.error(
"Traces cannot be uploaded; status code: %s, message %s",
result.status_code,
result.text,
)
if self.retry:
return SpanExportResult.FAILED_RETRYABLE
return SpanExportResult.FAILED_NOT_RETRYABLE
return SpanExportResult.SUCCESS
def _translate_to_zipkin(self, spans: Sequence[Span]):
local_endpoint = {"serviceName": self.service_name, "port": self.port}
if self.ipv4 is not None:
local_endpoint["ipv4"] = self.ipv4
if self.ipv6 is not None:
local_endpoint["ipv6"] = self.ipv6
zipkin_spans = []
for span in spans:
context = span.get_context()
trace_id = context.trace_id
span_id = context.span_id
# Timestamp in zipkin spans is int of microseconds.
# see: https://zipkin.io/pages/instrumenting.html
start_timestamp_mus = _nsec_to_usec_round(span.start_time)
duration_mus = _nsec_to_usec_round(span.end_time - span.start_time)
zipkin_span = {
"traceId": format(trace_id, "x"),
"id": format(span_id, "x"),
"name": span.name,
"timestamp": start_timestamp_mus,
"duration": duration_mus,
"localEndpoint": local_endpoint,
"kind": SPAN_KIND_MAP[span.kind],
"tags": _extract_tags_from_span(span.attributes),
"annotations": _extract_annotations_from_events(span.events),
}
if context.trace_flags.sampled:
zipkin_span["debug"] = 1
if isinstance(span.parent, Span):
zipkin_span["parentId"] = format(
span.parent.get_context().span_id, "x"
)
elif isinstance(span.parent, SpanContext):
zipkin_span["parentId"] = format(span.parent.span_id, "x")
zipkin_spans.append(zipkin_span)
return zipkin_spans
[docs] def shutdown(self) -> None:
pass
def _extract_tags_from_span(attr):
if not attr:
return None
tags = {}
for attribute_key, attribute_value in attr.items():
if isinstance(attribute_value, (int, bool, float)):
value = str(attribute_value)
elif isinstance(attribute_value, str):
value = attribute_value[:128]
else:
logger.warning("Could not serialize tag %s", attribute_key)
continue
tags[attribute_key] = value
return tags
def _extract_annotations_from_events(events):
return (
[
{"timestamp": _nsec_to_usec_round(e.timestamp), "value": e.name}
for e in events
]
if events
else None
)
def _nsec_to_usec_round(nsec):
"""Round nanoseconds to microseconds"""
return (nsec + 500) // 10 ** 3