Source code for opentelemetry.instrumentation.asgi

# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
The opentelemetry-instrumentation-asgi package provides an ASGI middleware that can be used
on any ASGI framework (such as Django-channels / Quart) to track requests
timing through OpenTelemetry.
"""

import operator
import typing
import urllib
from functools import wraps
from typing import Tuple

from asgiref.compatibility import guarantee_single_callable

from opentelemetry import context, propagators, trace
from opentelemetry.instrumentation.asgi.version import __version__  # noqa
from opentelemetry.instrumentation.utils import http_status_to_status_code
from opentelemetry.trace.propagation.textmap import DictGetter
from opentelemetry.trace.status import Status, StatusCode


[docs]class CarrierGetter(DictGetter):
[docs] def get(self, carrier: dict, key: str) -> typing.List[str]: """Getter implementation to retrieve a HTTP header value from the ASGI scope. Args: carrier: ASGI scope object key: header name in scope Returns: A list with a single string with the header value if it exists, else an empty list. """ headers = carrier.get("headers") return [ _value.decode("utf8") for (_key, _value) in headers if _key.decode("utf8") == key ]
carrier_getter = CarrierGetter()
[docs]def collect_request_attributes(scope): """Collects HTTP request attributes from the ASGI scope and returns a dictionary to be used as span creation attributes.""" server = scope.get("server") or ["0.0.0.0", 80] port = server[1] server_host = server[0] + (":" + str(port) if port != 80 else "") full_path = scope.get("root_path", "") + scope.get("path", "") http_url = scope.get("scheme", "http") + "://" + server_host + full_path query_string = scope.get("query_string") if query_string and http_url: if isinstance(query_string, bytes): query_string = query_string.decode("utf8") http_url = http_url + ("?" + urllib.parse.unquote(query_string)) result = { "component": scope["type"], "http.scheme": scope.get("scheme"), "http.host": server_host, "host.port": port, "http.flavor": scope.get("http_version"), "http.target": scope.get("path"), "http.url": http_url, } http_method = scope.get("method") if http_method: result["http.method"] = http_method http_host_value = ",".join(carrier_getter.get(scope, "host")) if http_host_value: result["http.server_name"] = http_host_value http_user_agent = carrier_getter.get(scope, "user-agent") if len(http_user_agent) > 0: result["http.user_agent"] = http_user_agent[0] if "client" in scope and scope["client"] is not None: result["net.peer.ip"] = scope.get("client")[0] result["net.peer.port"] = scope.get("client")[1] # remove None values result = {k: v for k, v in result.items() if v is not None} return result
[docs]def set_status_code(span, status_code): """Adds HTTP response attributes to span using the status_code argument.""" if not span.is_recording(): return try: status_code = int(status_code) except ValueError: span.set_status( Status( StatusCode.ERROR, "Non-integer HTTP status: " + repr(status_code), ) ) else: span.set_attribute("http.status_code", status_code) span.set_status(Status(http_status_to_status_code(status_code)))
[docs]def get_default_span_details(scope: dict) -> Tuple[str, dict]: """Default implementation for span_details_callback Args: scope: the asgi scope dictionary Returns: a tuple of the span, and any attributes to attach to the span. """ method_or_path = scope.get("method") or scope.get("path") return method_or_path, {}
[docs]class OpenTelemetryMiddleware: """The ASGI application middleware. This class is an ASGI middleware that starts and annotates spans for any requests it is invoked with. Args: app: The ASGI application callable to forward requests to. span_details_callback: Callback which should return a string and a tuple, representing the desired span name and a dictionary with any additional span attributes to set. Optional: Defaults to get_default_span_details. """ def __init__(self, app, span_details_callback=None): self.app = guarantee_single_callable(app) self.tracer = trace.get_tracer(__name__, __version__) self.span_details_callback = ( span_details_callback or get_default_span_details ) async def __call__(self, scope, receive, send): """The ASGI application Args: scope: A ASGI environment. receive: An awaitable callable yielding dictionaries send: An awaitable callable taking a single dictionary as argument. """ if scope["type"] not in ("http", "websocket"): return await self.app(scope, receive, send) token = context.attach(propagators.extract(carrier_getter, scope)) span_name, additional_attributes = self.span_details_callback(scope) try: with self.tracer.start_as_current_span( span_name + " asgi", kind=trace.SpanKind.SERVER, ) as span: if span.is_recording(): attributes = collect_request_attributes(scope) attributes.update(additional_attributes) for key, value in attributes.items(): span.set_attribute(key, value) @wraps(receive) async def wrapped_receive(): with self.tracer.start_as_current_span( span_name + " asgi." + scope["type"] + ".receive" ) as receive_span: message = await receive() if receive_span.is_recording(): if message["type"] == "websocket.receive": set_status_code(receive_span, 200) receive_span.set_attribute("type", message["type"]) return message @wraps(send) async def wrapped_send(message): with self.tracer.start_as_current_span( span_name + " asgi." + scope["type"] + ".send" ) as send_span: if send_span.is_recording(): if message["type"] == "http.response.start": status_code = message["status"] set_status_code(send_span, status_code) elif message["type"] == "websocket.send": set_status_code(send_span, 200) send_span.set_attribute("type", message["type"]) await send(message) await self.app(scope, wrapped_receive, wrapped_send) finally: context.detach(token)