Source code for opentelemetry.instrumentation.asyncpg

# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
This library allows tracing PostgreSQL queries made by the
`asyncpg <https://magicstack.github.io/asyncpg/current/>`_ library.

Usage
-----

.. code-block:: python

    import asyncpg
    from opentelemetry.instrumentation.asyncpg import AsyncPGInstrumentor

    # You can optionally pass a custom TracerProvider to AsyncPGInstrumentor.instrument()
    AsyncPGInstrumentor().instrument()
    conn = await asyncpg.connect(user='user', password='password',
                                 database='database', host='127.0.0.1')
    values = await conn.fetch('''SELECT 42;''')

API
---
"""

import asyncpg
import wrapt
from asyncpg import exceptions

from opentelemetry import trace
from opentelemetry.instrumentation.asyncpg.version import __version__
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.utils import unwrap
from opentelemetry.trace import SpanKind
from opentelemetry.trace.status import Status, StatusCode

_APPLIED = "_opentelemetry_tracer"


def _hydrate_span_from_args(connection, query, parameters) -> dict:
    span_attributes = {"db.type": "sql"}

    params = getattr(connection, "_params", None)
    span_attributes["db.instance"] = getattr(params, "database", None)
    span_attributes["db.user"] = getattr(params, "user", None)

    if query is not None:
        span_attributes["db.statement"] = query

    if parameters is not None and len(parameters) > 0:
        span_attributes["db.statement.parameters"] = str(parameters)

    return span_attributes


[docs]class AsyncPGInstrumentor(BaseInstrumentor): def __init__(self, capture_parameters=False): super().__init__() self.capture_parameters = capture_parameters def _instrument(self, **kwargs): tracer_provider = kwargs.get( "tracer_provider", trace.get_tracer_provider() ) setattr( asyncpg, _APPLIED, tracer_provider.get_tracer("asyncpg", __version__), ) for method in [ "Connection.execute", "Connection.executemany", "Connection.fetch", "Connection.fetchval", "Connection.fetchrow", ]: wrapt.wrap_function_wrapper( "asyncpg.connection", method, self._do_execute ) def _uninstrument(self, **__): delattr(asyncpg, _APPLIED) for method in [ "execute", "executemany", "fetch", "fetchval", "fetchrow", ]: unwrap(asyncpg.Connection, method) async def _do_execute(self, func, instance, args, kwargs): tracer = getattr(asyncpg, _APPLIED) exception = None with tracer.start_as_current_span( "postgresql", kind=SpanKind.CLIENT ) as span: if span.is_recording(): span_attributes = _hydrate_span_from_args( instance, args[0], args[1:] if self.capture_parameters else None, ) for attribute, value in span_attributes.items(): span.set_attribute(attribute, value) try: result = await func(*args, **kwargs) except Exception as exc: # pylint: disable=W0703 exception = exc raise finally: if span.is_recording() and exception is not None: span.set_status(Status(StatusCode.ERROR)) return result