# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
This library allows to export tracing data to `Zipkin <https://zipkin.io/>`_.
Usage
-----
The **OpenTelemetry Zipkin Exporter** allows to export `OpenTelemetry`_ traces to `Zipkin`_.
This exporter always send traces to the configured Zipkin collector using HTTP.
.. _Zipkin: https://zipkin.io/
.. _OpenTelemetry: https://github.com/open-telemetry/opentelemetry-python/
.. _Specification: https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/sdk-environment-variables.md#zipkin-exporter
.. code:: python
from opentelemetry import trace
from opentelemetry.exporter import zipkin
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchExportSpanProcessor
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)
# create a ZipkinSpanExporter
zipkin_exporter = zipkin.ZipkinSpanExporter(
service_name="my-helloworld-service",
# optional:
# url="http://localhost:9411/api/v2/spans",
# ipv4="",
# ipv6="",
# retry=False,
)
# Create a BatchExportSpanProcessor and add the exporter to it
span_processor = BatchExportSpanProcessor(zipkin_exporter)
# add to the tracer
trace.get_tracer_provider().add_span_processor(span_processor)
with tracer.start_as_current_span("foo"):
print("Hello world!")
The exporter supports endpoint configuration via the OTEL_EXPORTER_ZIPKIN_ENDPOINT environment variables as defined in the `Specification`_
API
---
"""
import json
import logging
import os
from typing import Optional, Sequence
from urllib.parse import urlparse
import requests
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult
from opentelemetry.trace import Span, SpanContext, SpanKind
DEFAULT_RETRY = False
DEFAULT_URL = "http://localhost:9411/api/v2/spans"
DEFAULT_MAX_TAG_VALUE_LENGTH = 128
ZIPKIN_HEADERS = {"Content-Type": "application/json"}
SPAN_KIND_MAP = {
SpanKind.INTERNAL: None,
SpanKind.SERVER: "SERVER",
SpanKind.CLIENT: "CLIENT",
SpanKind.PRODUCER: "PRODUCER",
SpanKind.CONSUMER: "CONSUMER",
}
SUCCESS_STATUS_CODES = (200, 202)
logger = logging.getLogger(__name__)
[docs]class ZipkinSpanExporter(SpanExporter):
"""Zipkin span exporter for OpenTelemetry.
Args:
service_name: Service that logged an annotation in a trace.Classifier
when query for spans.
url: The Zipkin endpoint URL
ipv4: Primary IPv4 address associated with this connection.
ipv6: Primary IPv6 address associated with this connection.
retry: Set to True to configure the exporter to retry on failure.
"""
def __init__(
self,
service_name: str,
url: str = None,
ipv4: Optional[str] = None,
ipv6: Optional[str] = None,
retry: Optional[str] = DEFAULT_RETRY,
max_tag_value_length: Optional[int] = DEFAULT_MAX_TAG_VALUE_LENGTH,
):
self.service_name = service_name
if url is None:
self.url = os.environ.get(
"OTEL_EXPORTER_ZIPKIN_ENDPOINT", DEFAULT_URL
)
else:
self.url = url
self.port = urlparse(self.url).port
self.ipv4 = ipv4
self.ipv6 = ipv6
self.retry = retry
self.max_tag_value_length = max_tag_value_length
[docs] def export(self, spans: Sequence[Span]) -> SpanExportResult:
zipkin_spans = self._translate_to_zipkin(spans)
result = requests.post(
url=self.url, data=json.dumps(zipkin_spans), headers=ZIPKIN_HEADERS
)
if result.status_code not in SUCCESS_STATUS_CODES:
logger.error(
"Traces cannot be uploaded; status code: %s, message %s",
result.status_code,
result.text,
)
if self.retry:
return SpanExportResult.FAILURE
return SpanExportResult.FAILURE
return SpanExportResult.SUCCESS
[docs] def shutdown(self) -> None:
pass
def _translate_to_zipkin(self, spans: Sequence[Span]):
local_endpoint = {"serviceName": self.service_name, "port": self.port}
if self.ipv4 is not None:
local_endpoint["ipv4"] = self.ipv4
if self.ipv6 is not None:
local_endpoint["ipv6"] = self.ipv6
zipkin_spans = []
for span in spans:
context = span.get_span_context()
trace_id = context.trace_id
span_id = context.span_id
# Timestamp in zipkin spans is int of microseconds.
# see: https://zipkin.io/pages/instrumenting.html
start_timestamp_mus = _nsec_to_usec_round(span.start_time)
duration_mus = _nsec_to_usec_round(span.end_time - span.start_time)
zipkin_span = {
# Ensure left-zero-padding of traceId, spanId, parentId
"traceId": format(trace_id, "032x"),
"id": format(span_id, "016x"),
"name": span.name,
"timestamp": start_timestamp_mus,
"duration": duration_mus,
"localEndpoint": local_endpoint,
"kind": SPAN_KIND_MAP[span.kind],
"tags": self._extract_tags_from_span(span),
"annotations": self._extract_annotations_from_events(
span.events
),
}
if span.instrumentation_info is not None:
zipkin_span["tags"][
"otel.instrumentation_library.name"
] = span.instrumentation_info.name
zipkin_span["tags"][
"otel.instrumentation_library.version"
] = span.instrumentation_info.version
if span.status is not None:
zipkin_span["tags"]["otel.status_code"] = str(
span.status.status_code.value
)
if span.status.description is not None:
zipkin_span["tags"][
"otel.status_description"
] = span.status.description
if context.trace_flags.sampled:
zipkin_span["debug"] = True
if isinstance(span.parent, Span):
zipkin_span["parentId"] = format(
span.parent.get_span_context().span_id, "016x"
)
elif isinstance(span.parent, SpanContext):
zipkin_span["parentId"] = format(span.parent.span_id, "016x")
zipkin_spans.append(zipkin_span)
return zipkin_spans
def _extract_tags_from_dict(self, tags_dict):
tags = {}
if not tags_dict:
return tags
for attribute_key, attribute_value in tags_dict.items():
if isinstance(attribute_value, (int, bool, float)):
value = str(attribute_value)
elif isinstance(attribute_value, str):
value = attribute_value
else:
logger.warning("Could not serialize tag %s", attribute_key)
continue
if self.max_tag_value_length > 0:
value = value[: self.max_tag_value_length]
tags[attribute_key] = value
return tags
def _extract_tags_from_span(self, span: Span):
tags = self._extract_tags_from_dict(getattr(span, "attributes", None))
if span.resource:
tags.update(self._extract_tags_from_dict(span.resource.attributes))
return tags
def _extract_annotations_from_events(self, events):
if not events:
return None
annotations = []
for event in events:
attrs = {}
for key, value in event.attributes.items():
if isinstance(value, str):
value = value[: self.max_tag_value_length]
attrs[key] = value
annotations.append(
{
"timestamp": _nsec_to_usec_round(event.timestamp),
"value": json.dumps({event.name: attrs}),
}
)
return annotations
def _nsec_to_usec_round(nsec):
"""Round nanoseconds to microseconds"""
return (nsec + 500) // 10 ** 3