Source code for opentelemetry.exporter.datadog.spanprocessor

# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import collections
import logging
import threading
import typing

from opentelemetry.context import Context, attach, detach, set_value
from opentelemetry.sdk.trace import Span, SpanProcessor
from opentelemetry.sdk.trace.export import SpanExporter
from opentelemetry.trace import INVALID_TRACE_ID
from opentelemetry.util import time_ns

logger = logging.getLogger(__name__)


[docs]class DatadogExportSpanProcessor(SpanProcessor): """Datadog exporter span processor DatadogExportSpanProcessor is an implementation of `SpanProcessor` that batches all opened spans into a list per trace. When all spans for a trace are ended, the trace is queues up for export. This is required for exporting to the Datadog Agent which expects to received list of spans for each trace. """ _FLUSH_TOKEN = INVALID_TRACE_ID def __init__( self, span_exporter: SpanExporter, schedule_delay_millis: float = 5000, max_trace_size: int = 4096, ): if max_trace_size <= 0: raise ValueError("max_queue_size must be a positive integer.") if schedule_delay_millis <= 0: raise ValueError("schedule_delay_millis must be positive.") self.span_exporter = span_exporter # queue trace_ids for traces with recently ended spans for worker thread to check # for exporting self.check_traces_queue = ( collections.deque() ) # type: typing.Deque[int] self.traces_lock = threading.Lock() # dictionary of trace_ids to a list of spans where the first span is the # first opened span for the trace self.traces = collections.defaultdict(list) # counter to keep track of the number of spans and ended spans for a # trace_id self.traces_spans_count = collections.Counter() self.traces_spans_ended_count = collections.Counter() self.worker_thread = threading.Thread(target=self.worker, daemon=True) # threading conditions used for flushing and shutdown self.condition = threading.Condition(threading.Lock()) self.flush_condition = threading.Condition(threading.Lock()) # flag to indicate that there is a flush operation on progress self._flushing = False self.max_trace_size = max_trace_size self._spans_dropped = False self.schedule_delay_millis = schedule_delay_millis self.done = False self.worker_thread.start()
[docs] def on_start( self, span: Span, parent_context: typing.Optional[Context] = None ) -> None: ctx = span.get_span_context() trace_id = ctx.trace_id with self.traces_lock: # check upper bound on number of spans for trace before adding new # span if self.traces_spans_count[trace_id] == self.max_trace_size: logger.warning("Max spans for trace, spans will be dropped.") self._spans_dropped = True return # add span to end of list for a trace and update the counter self.traces[trace_id].append(span) self.traces_spans_count[trace_id] += 1
[docs] def on_end(self, span: Span) -> None: if self.done: logger.warning("Already shutdown, dropping span.") return ctx = span.get_span_context() trace_id = ctx.trace_id with self.traces_lock: self.traces_spans_ended_count[trace_id] += 1 if self.is_trace_exportable(trace_id): self.check_traces_queue.appendleft(trace_id)
[docs] def worker(self): timeout = self.schedule_delay_millis / 1e3 while not self.done: if not self._flushing: with self.condition: self.condition.wait(timeout) if not self.check_traces_queue: # spurious notification, let's wait again, reset timeout timeout = self.schedule_delay_millis / 1e3 continue if self.done: # missing spans will be sent when calling flush break # substract the duration of this export call to the next timeout start = time_ns() self.export() end = time_ns() duration = (end - start) / 1e9 timeout = self.schedule_delay_millis / 1e3 - duration # be sure that all spans are sent self._drain_queue()
[docs] def is_trace_exportable(self, trace_id): return ( self.traces_spans_count[trace_id] - self.traces_spans_ended_count[trace_id] <= 0 )
[docs] def export(self) -> None: """Exports traces with finished spans.""" notify_flush = False export_trace_ids = [] while self.check_traces_queue: trace_id = self.check_traces_queue.pop() if trace_id is self._FLUSH_TOKEN: notify_flush = True else: with self.traces_lock: # check whether trace is exportable again in case that new # spans were started since we last concluded trace was # exportable if self.is_trace_exportable(trace_id): export_trace_ids.append(trace_id) del self.traces_spans_count[trace_id] del self.traces_spans_ended_count[trace_id] if len(export_trace_ids) > 0: token = attach(set_value("suppress_instrumentation", True)) for trace_id in export_trace_ids: with self.traces_lock: try: # Ignore type b/c the Optional[None]+slicing is too "clever" # for mypy self.span_exporter.export(self.traces[trace_id]) # type: ignore # pylint: disable=broad-except except Exception: logger.exception( "Exception while exporting Span batch." ) finally: del self.traces[trace_id] detach(token) if notify_flush: with self.flush_condition: self.flush_condition.notify()
def _drain_queue(self): """"Export all elements until queue is empty. Can only be called from the worker thread context because it invokes `export` that is not thread safe. """ while self.check_traces_queue: self.export()
[docs] def force_flush(self, timeout_millis: int = 30000) -> bool: if self.done: logger.warning("Already shutdown, ignoring call to force_flush().") return True self._flushing = True self.check_traces_queue.appendleft(self._FLUSH_TOKEN) # wake up worker thread with self.condition: self.condition.notify_all() # wait for token to be processed with self.flush_condition: ret = self.flush_condition.wait(timeout_millis / 1e3) self._flushing = False if not ret: logger.warning("Timeout was exceeded in force_flush().") return ret
[docs] def shutdown(self) -> None: # signal the worker thread to finish and then wait for it self.done = True with self.condition: self.condition.notify_all() self.worker_thread.join() self.span_exporter.shutdown()