Source code for opentelemetry.instrumentation.aiohttp_client

# Copyright 2020, OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
The opentelemetry-instrumentation-aiohttp-client package allows tracing HTTP
requests made by the aiohttp client library.

Usage
-----
Explicitly instrumenting a single client session:

.. code:: python

    import aiohttp
    from opentelemetry.instrumentation.aiohttp_client import (
        create_trace_config,
        url_path_span_name
    )
    import yarl

    def strip_query_params(url: yarl.URL) -> str:
        return str(url.with_query(None))

    async with aiohttp.ClientSession(trace_configs=[create_trace_config(
            # Remove all query params from the URL attribute on the span.
            url_filter=strip_query_params,
            # Use the URL's path as the span name.
            span_name=url_path_span_name
    )]) as session:
        async with session.get(url) as response:
            await response.text()

Instrumenting all client sessions:

.. code:: python

    import aiohttp
    from opentelemetry.instrumentation.aiohttp_client import (
        AioHttpClientInstrumentor
    )

    # Enable instrumentation
    AioHttpClientInstrumentor().instrument()

    # Create a session and make an HTTP get request
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            await response.text()

API
---
"""

import socket
import types
import typing

import aiohttp
import wrapt

from opentelemetry import context as context_api
from opentelemetry import propagators, trace
from opentelemetry.instrumentation.aiohttp_client.version import __version__
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.utils import (
    http_status_to_status_code,
    unwrap,
)
from opentelemetry.trace import SpanKind, TracerProvider, get_tracer
from opentelemetry.trace.status import Status, StatusCode

_UrlFilterT = typing.Optional[typing.Callable[[str], str]]
_SpanNameT = typing.Optional[
    typing.Union[typing.Callable[[aiohttp.TraceRequestStartParams], str], str]
]


[docs]def url_path_span_name(params: aiohttp.TraceRequestStartParams) -> str: """Extract a span name from the request URL path. A simple callable to extract the path portion of the requested URL for use as the span name. :param aiohttp.TraceRequestStartParams params: Parameters describing the traced request. :return: The URL path. :rtype: str """ return params.url.path
[docs]def create_trace_config( url_filter: _UrlFilterT = None, span_name: _SpanNameT = None, tracer_provider: TracerProvider = None, ) -> aiohttp.TraceConfig: """Create an aiohttp-compatible trace configuration. One span is created for the entire HTTP request, including initial TCP/TLS setup if the connection doesn't exist. By default the span name is set to the HTTP request method. Example usage: .. code:: python import aiohttp from opentelemetry.instrumentation.aiohttp_client import create_trace_config async with aiohttp.ClientSession(trace_configs=[create_trace_config()]) as session: async with session.get(url) as response: await response.text() :param url_filter: A callback to process the requested URL prior to adding it as a span attribute. This can be useful to remove sensitive data such as API keys or user personal information. :param str span_name: Override the default span name. :param tracer_provider: optional TracerProvider from which to get a Tracer :return: An object suitable for use with :py:class:`aiohttp.ClientSession`. :rtype: :py:class:`aiohttp.TraceConfig` """ # `aiohttp.TraceRequestStartParams` resolves to `aiohttp.tracing.TraceRequestStartParams` # which doesn't exist in the aiottp intersphinx inventory. # Explicitly specify the type for the `span_name` param and rtype to work # around this issue. tracer = get_tracer(__name__, __version__, tracer_provider) def _end_trace(trace_config_ctx: types.SimpleNamespace): context_api.detach(trace_config_ctx.token) trace_config_ctx.span.end() async def on_request_start( unused_session: aiohttp.ClientSession, trace_config_ctx: types.SimpleNamespace, params: aiohttp.TraceRequestStartParams, ): if context_api.get_value("suppress_instrumentation"): trace_config_ctx.span = None return http_method = params.method.upper() if trace_config_ctx.span_name is None: request_span_name = "HTTP {}".format(http_method) elif callable(trace_config_ctx.span_name): request_span_name = str(trace_config_ctx.span_name(params)) else: request_span_name = str(trace_config_ctx.span_name) trace_config_ctx.span = trace_config_ctx.tracer.start_span( request_span_name, kind=SpanKind.CLIENT, ) if trace_config_ctx.span.is_recording(): attributes = { "component": "http", "http.method": http_method, "http.url": trace_config_ctx.url_filter(params.url) if callable(trace_config_ctx.url_filter) else str(params.url), } for key, value in attributes.items(): trace_config_ctx.span.set_attribute(key, value) trace_config_ctx.token = context_api.attach( trace.set_span_in_context(trace_config_ctx.span) ) propagators.inject(type(params.headers).__setitem__, params.headers) async def on_request_end( unused_session: aiohttp.ClientSession, trace_config_ctx: types.SimpleNamespace, params: aiohttp.TraceRequestEndParams, ): if trace_config_ctx.span is None: return if trace_config_ctx.span.is_recording(): trace_config_ctx.span.set_status( Status(http_status_to_status_code(int(params.response.status))) ) trace_config_ctx.span.set_attribute( "http.status_code", params.response.status ) trace_config_ctx.span.set_attribute( "http.status_text", params.response.reason ) _end_trace(trace_config_ctx) async def on_request_exception( unused_session: aiohttp.ClientSession, trace_config_ctx: types.SimpleNamespace, params: aiohttp.TraceRequestExceptionParams, ): if trace_config_ctx.span is None: return if trace_config_ctx.span.is_recording() and params.exception: trace_config_ctx.span.set_status(Status(StatusCode.ERROR)) trace_config_ctx.span.record_exception(params.exception) _end_trace(trace_config_ctx) def _trace_config_ctx_factory(**kwargs): kwargs.setdefault("trace_request_ctx", {}) return types.SimpleNamespace( span_name=span_name, tracer=tracer, url_filter=url_filter, **kwargs ) trace_config = aiohttp.TraceConfig( trace_config_ctx_factory=_trace_config_ctx_factory ) trace_config.on_request_start.append(on_request_start) trace_config.on_request_end.append(on_request_end) trace_config.on_request_exception.append(on_request_exception) return trace_config
def _instrument( tracer_provider: TracerProvider = None, url_filter: _UrlFilterT = None, span_name: _SpanNameT = None, ): """Enables tracing of all ClientSessions When a ClientSession gets created a TraceConfig is automatically added to the session's trace_configs. """ # pylint:disable=unused-argument def instrumented_init(wrapped, instance, args, kwargs): if context_api.get_value("suppress_instrumentation"): return wrapped(*args, **kwargs) trace_configs = list(kwargs.get("trace_configs") or ()) trace_config = create_trace_config( url_filter=url_filter, span_name=span_name, tracer_provider=tracer_provider, ) trace_config.opentelemetry_aiohttp_instrumented = True trace_configs.append(trace_config) kwargs["trace_configs"] = trace_configs return wrapped(*args, **kwargs) wrapt.wrap_function_wrapper( aiohttp.ClientSession, "__init__", instrumented_init ) def _uninstrument(): """Disables instrumenting for all newly created ClientSessions""" unwrap(aiohttp.ClientSession, "__init__") def _uninstrument_session(client_session: aiohttp.ClientSession): """Disables instrumentation for the given ClientSession""" # pylint: disable=protected-access trace_configs = client_session._trace_configs client_session._trace_configs = [ trace_config for trace_config in trace_configs if not hasattr(trace_config, "opentelemetry_aiohttp_instrumented") ]
[docs]class AioHttpClientInstrumentor(BaseInstrumentor): """An instrumentor for aiohttp client sessions See `BaseInstrumentor` """ def _instrument(self, **kwargs): """Instruments aiohttp ClientSession Args: **kwargs: Optional arguments ``tracer_provider``: a TracerProvider, defaults to global ``url_filter``: A callback to process the requested URL prior to adding it as a span attribute. This can be useful to remove sensitive data such as API keys or user personal information. ``span_name``: Override the default span name. """ _instrument( tracer_provider=kwargs.get("tracer_provider"), url_filter=kwargs.get("url_filter"), span_name=kwargs.get("span_name"), ) def _uninstrument(self, **kwargs): _uninstrument()
[docs] @staticmethod def uninstrument_session(client_session: aiohttp.ClientSession): """Disables instrumentation for the given session""" _uninstrument_session(client_session)