Source code for opentelemetry.ext.psycopg2

# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
The integration with PostgreSQL supports the `Psycopg`_ library and is specified
to ``trace_integration`` using ``'PostgreSQL'``.

.. _Psycopg: http://initd.org/psycopg/

Usage
-----

.. code-block:: python

    import psycopg2
    from opentelemetry import trace
    from opentelemetry.sdk.trace import TracerProvider
    from opentelemetry.trace.ext.psycopg2 import trace_integration

    trace.set_tracer_provider(TracerProvider())

    trace_integration()
    cnx = psycopg2.connect(database='Database')
    cursor = cnx.cursor()
    cursor.execute("INSERT INTO test (testField) VALUES (123)")
    cursor.close()
    cnx.close()

API
---
"""

import logging
import typing

import psycopg2
import wrapt
from psycopg2.sql import Composable

from opentelemetry.ext.dbapi import DatabaseApiIntegration, TracedCursor
from opentelemetry.ext.psycopg2.version import __version__
from opentelemetry.trace import Tracer, get_tracer

logger = logging.getLogger(__name__)

DATABASE_COMPONENT = "postgresql"
DATABASE_TYPE = "sql"


[docs]def trace_integration(tracer_provider=None): """Integrate with PostgreSQL Psycopg library. Psycopg: http://initd.org/psycopg/ """ tracer = get_tracer(__name__, __version__, tracer_provider) connection_attributes = { "database": "info.dbname", "port": "info.port", "host": "info.host", "user": "info.user", } db_integration = DatabaseApiIntegration( tracer, DATABASE_COMPONENT, database_type=DATABASE_TYPE, connection_attributes=connection_attributes, ) # pylint: disable=unused-argument def wrap_connect( connect_func: typing.Callable[..., any], instance: typing.Any, args: typing.Tuple[any, any], kwargs: typing.Dict[any, any], ): connection = connect_func(*args, **kwargs) db_integration.get_connection_attributes(connection) connection.cursor_factory = PsycopgTraceCursor return connection try: wrapt.wrap_function_wrapper(psycopg2, "connect", wrap_connect) except Exception as ex: # pylint: disable=broad-except logger.warning("Failed to integrate with pyscopg2. %s", str(ex)) class PsycopgTraceCursor(psycopg2.extensions.cursor): def __init__(self, *args, **kwargs): self._traced_cursor = TracedCursor(db_integration) super(PsycopgTraceCursor, self).__init__(*args, **kwargs) # pylint: disable=redefined-builtin def execute(self, query, vars=None): if isinstance(query, Composable): query = query.as_string(self) return self._traced_cursor.traced_execution( super(PsycopgTraceCursor, self).execute, query, vars ) # pylint: disable=redefined-builtin def executemany(self, query, vars): if isinstance(query, Composable): query = query.as_string(self) return self._traced_cursor.traced_execution( super(PsycopgTraceCursor, self).executemany, query, vars ) # pylint: disable=redefined-builtin def callproc(self, procname, vars=None): return self._traced_cursor.traced_execution( super(PsycopgTraceCursor, self).callproc, procname, vars )