Source code for opentelemetry.instrumentation.system_metrics

# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Instrument to report system (CPU, memory, network) and
process (CPU, memory, garbage collection) metrics. By default, the
following metrics are configured:

.. code:: python

    {
        "system.cpu.time": ["idle", "user", "system", "irq"],
        "system.cpu.utilization": ["idle", "user", "system", "irq"],
        "system.memory.usage": ["used", "free", "cached"],
        "system.memory.utilization": ["used", "free", "cached"],
        "system.swap.usage": ["used", "free"],
        "system.swap.utilization": ["used", "free"],
        "system.disk.io": ["read", "write"],
        "system.disk.operations": ["read", "write"],
        "system.disk.time": ["read", "write"],
        "system.disk.merged": ["read", "write"],
        "system.network.dropped.packets": ["transmit", "receive"],
        "system.network.packets": ["transmit", "receive"],
        "system.network.errors": ["transmit", "receive"],
        "system.network.io": ["trasmit", "receive"],
        "system.network.connections": ["family", "type"],
        "runtime.memory": ["rss", "vms"],
        "runtime.cpu.time": ["user", "system"],
    }

Usage
-----

.. code:: python

    from opentelemetry import metrics
    from opentelemetry.instrumentation.system_metrics import SystemMetrics
    from opentelemetry.sdk.metrics import MeterProvider,
    from opentelemetry.sdk.metrics.export import ConsoleMetricsExporter

    metrics.set_meter_provider(MeterProvider())
    exporter = ConsoleMetricsExporter()
    SystemMetrics(exporter)

    # metrics are collected asynchronously
    input("...")

    # to configure custom metrics
    configuration = {
        "system.memory.usage": ["used", "free", "cached"],
        "system.cpu.time": ["idle", "user", "system", "irq"],
        "system.network.io": ["trasmit", "receive"],
        "runtime.memory": ["rss", "vms"],
        "runtime.cpu.time": ["user", "system"],
    }
    SystemMetrics(exporter, config=configuration)

API
---
"""

import gc
import os
import typing
from platform import python_implementation

import psutil

from opentelemetry import metrics
from opentelemetry.sdk.metrics.export import MetricsExporter
from opentelemetry.sdk.metrics.export.controller import PushController
from opentelemetry.sdk.util import get_dict_as_key


[docs]class SystemMetrics: # pylint: disable=too-many-statements def __init__( self, exporter: MetricsExporter, interval: int = 30, labels: typing.Optional[typing.Dict[str, str]] = None, config: typing.Optional[typing.Dict[str, typing.List[str]]] = None, ): self._labels = {} if labels is None else labels self.meter = metrics.get_meter(__name__) self.controller = PushController( meter=self.meter, exporter=exporter, interval=interval ) self._python_implementation = python_implementation().lower() if config is None: self._config = { "system.cpu.time": ["idle", "user", "system", "irq"], "system.cpu.utilization": ["idle", "user", "system", "irq"], "system.memory.usage": ["used", "free", "cached"], "system.memory.utilization": ["used", "free", "cached"], "system.swap.usage": ["used", "free"], "system.swap.utilization": ["used", "free"], # system.swap.page.faults: [], # system.swap.page.operations: [], "system.disk.io": ["read", "write"], "system.disk.operations": ["read", "write"], "system.disk.time": ["read", "write"], "system.disk.merged": ["read", "write"], # "system.filesystem.usage": [], # "system.filesystem.utilization": [], "system.network.dropped.packets": ["transmit", "receive"], "system.network.packets": ["transmit", "receive"], "system.network.errors": ["transmit", "receive"], "system.network.io": ["trasmit", "receive"], "system.network.connections": ["family", "type"], "runtime.memory": ["rss", "vms"], "runtime.cpu.time": ["user", "system"], } else: self._config = config self._proc = psutil.Process(os.getpid()) self._system_cpu_time_labels = self._labels.copy() self._system_cpu_utilization_labels = self._labels.copy() self._system_memory_usage_labels = self._labels.copy() self._system_memory_utilization_labels = self._labels.copy() self._system_swap_usage_labels = self._labels.copy() self._system_swap_utilization_labels = self._labels.copy() # self._system_swap_page_faults = self._labels.copy() # self._system_swap_page_operations = self._labels.copy() self._system_disk_io_labels = self._labels.copy() self._system_disk_operations_labels = self._labels.copy() self._system_disk_time_labels = self._labels.copy() self._system_disk_merged_labels = self._labels.copy() # self._system_filesystem_usage_labels = self._labels.copy() # self._system_filesystem_utilization_labels = self._labels.copy() self._system_network_dropped_packets_labels = self._labels.copy() self._system_network_packets_labels = self._labels.copy() self._system_network_errors_labels = self._labels.copy() self._system_network_io_labels = self._labels.copy() self._system_network_connections_labels = self._labels.copy() self._runtime_memory_labels = self._labels.copy() self._runtime_cpu_time_labels = self._labels.copy() self._runtime_gc_count_labels = self._labels.copy() self.meter.register_sumobserver( callback=self._get_system_cpu_time, name="system.cpu.time", description="System CPU time", unit="seconds", value_type=float, ) self.meter.register_valueobserver( callback=self._get_system_cpu_utilization, name="system.cpu.utilization", description="System CPU utilization", unit="1", value_type=float, ) self.meter.register_valueobserver( callback=self._get_system_memory_usage, name="system.memory.usage", description="System memory usage", unit="bytes", value_type=int, ) self.meter.register_valueobserver( callback=self._get_system_memory_utilization, name="system.memory.utilization", description="System memory utilization", unit="1", value_type=float, ) self.meter.register_valueobserver( callback=self._get_system_swap_usage, name="system.swap.usage", description="System swap usage", unit="pages", value_type=int, ) self.meter.register_valueobserver( callback=self._get_system_swap_utilization, name="system.swap.utilization", description="System swap utilization", unit="1", value_type=float, ) # self.meter.register_sumobserver( # callback=self._get_system_swap_page_faults, # name="system.swap.page_faults", # description="System swap page faults", # unit="faults", # value_type=int, # ) # self.meter.register_sumobserver( # callback=self._get_system_swap_page_operations, # name="system.swap.page_operations", # description="System swap page operations", # unit="operations", # value_type=int, # ) self.meter.register_sumobserver( callback=self._get_system_disk_io, name="system.disk.io", description="System disk IO", unit="bytes", value_type=int, ) self.meter.register_sumobserver( callback=self._get_system_disk_operations, name="system.disk.operations", description="System disk operations", unit="operations", value_type=int, ) self.meter.register_sumobserver( callback=self._get_system_disk_time, name="system.disk.time", description="System disk time", unit="seconds", value_type=float, ) self.meter.register_sumobserver( callback=self._get_system_disk_merged, name="system.disk.merged", description="System disk merged", unit="1", value_type=int, ) # self.meter.register_valueobserver( # callback=self._get_system_filesystem_usage, # name="system.filesystem.usage", # description="System filesystem usage", # unit="bytes", # value_type=int, # ) # self.meter.register_valueobserver( # callback=self._get_system_filesystem_utilization, # name="system.filesystem.utilization", # description="System filesystem utilization", # unit="1", # value_type=float, # ) self.meter.register_sumobserver( callback=self._get_system_network_dropped_packets, name="system.network.dropped_packets", description="System network dropped_packets", unit="packets", value_type=int, ) self.meter.register_sumobserver( callback=self._get_system_network_packets, name="system.network.packets", description="System network packets", unit="packets", value_type=int, ) self.meter.register_sumobserver( callback=self._get_system_network_errors, name="system.network.errors", description="System network errors", unit="errors", value_type=int, ) self.meter.register_sumobserver( callback=self._get_system_network_io, name="system.network.io", description="System network io", unit="bytes", value_type=int, ) self.meter.register_updownsumobserver( callback=self._get_system_network_connections, name="system.network.connections", description="System network connections", unit="connections", value_type=int, ) self.meter.register_sumobserver( callback=self._get_runtime_memory, name="runtime.{}.memory".format(self._python_implementation), description="Runtime {} memory".format( self._python_implementation ), unit="bytes", value_type=int, ) self.meter.register_sumobserver( callback=self._get_runtime_cpu_time, name="runtime.{}.cpu_time".format(self._python_implementation), description="Runtime {} CPU time".format( self._python_implementation ), unit="seconds", value_type=float, ) self.meter.register_sumobserver( callback=self._get_runtime_gc_count, name="runtime.{}.gc_count".format(self._python_implementation), description="Runtime {} GC count".format( self._python_implementation ), unit="bytes", value_type=int, ) def _get_system_cpu_time(self, observer: metrics.ValueObserver) -> None: """Observer callback for system CPU time Args: observer: the observer to update """ for cpu, times in enumerate(psutil.cpu_times(percpu=True)): for metric in self._config["system.cpu.time"]: if hasattr(times, metric): self._system_cpu_time_labels["state"] = metric self._system_cpu_time_labels["cpu"] = cpu + 1 observer.observe( getattr(times, metric), self._system_cpu_time_labels ) def _get_system_cpu_utilization( self, observer: metrics.ValueObserver ) -> None: """Observer callback for system CPU utilization Args: observer: the observer to update """ for cpu, times_percent in enumerate( psutil.cpu_times_percent(percpu=True) ): for metric in self._config["system.cpu.utilization"]: if hasattr(times_percent, metric): self._system_cpu_utilization_labels["state"] = metric self._system_cpu_utilization_labels["cpu"] = cpu + 1 observer.observe( getattr(times_percent, metric) / 100, self._system_cpu_utilization_labels, ) def _get_system_memory_usage( self, observer: metrics.ValueObserver ) -> None: """Observer callback for memory usage Args: observer: the observer to update """ virtual_memory = psutil.virtual_memory() for metric in self._config["system.memory.usage"]: self._system_memory_usage_labels["state"] = metric if hasattr(virtual_memory, metric): observer.observe( getattr(virtual_memory, metric), self._system_memory_usage_labels, ) def _get_system_memory_utilization( self, observer: metrics.ValueObserver ) -> None: """Observer callback for memory utilization Args: observer: the observer to update """ system_memory = psutil.virtual_memory() for metric in self._config["system.memory.utilization"]: self._system_memory_utilization_labels["state"] = metric if hasattr(system_memory, metric): observer.observe( getattr(system_memory, metric) / system_memory.total, self._system_memory_utilization_labels, ) def _get_system_swap_usage(self, observer: metrics.ValueObserver) -> None: """Observer callback for swap usage Args: observer: the observer to update """ system_swap = psutil.swap_memory() for metric in self._config["system.swap.usage"]: self._system_swap_usage_labels["state"] = metric if hasattr(system_swap, metric): observer.observe( getattr(system_swap, metric), self._system_swap_usage_labels, ) def _get_system_swap_utilization( self, observer: metrics.ValueObserver ) -> None: """Observer callback for swap utilization Args: observer: the observer to update """ system_swap = psutil.swap_memory() for metric in self._config["system.swap.utilization"]: if hasattr(system_swap, metric): self._system_swap_utilization_labels["state"] = metric observer.observe( getattr(system_swap, metric) / system_swap.total, self._system_swap_utilization_labels, ) # TODO Add _get_system_swap_page_faults # TODO Add _get_system_swap_page_operations def _get_system_disk_io(self, observer: metrics.SumObserver) -> None: """Observer callback for disk IO Args: observer: the observer to update """ for device, counters in psutil.disk_io_counters(perdisk=True).items(): for metric in self._config["system.disk.io"]: if hasattr(counters, "{}_bytes".format(metric)): self._system_disk_io_labels["device"] = device self._system_disk_io_labels["direction"] = metric observer.observe( getattr(counters, "{}_bytes".format(metric)), self._system_disk_io_labels, ) def _get_system_disk_operations( self, observer: metrics.SumObserver ) -> None: """Observer callback for disk operations Args: observer: the observer to update """ for device, counters in psutil.disk_io_counters(perdisk=True).items(): for metric in self._config["system.disk.operations"]: if hasattr(counters, "{}_count".format(metric)): self._system_disk_operations_labels["device"] = device self._system_disk_operations_labels["direction"] = metric observer.observe( getattr(counters, "{}_count".format(metric)), self._system_disk_operations_labels, ) def _get_system_disk_time(self, observer: metrics.SumObserver) -> None: """Observer callback for disk time Args: observer: the observer to update """ for device, counters in psutil.disk_io_counters(perdisk=True).items(): for metric in self._config["system.disk.time"]: if hasattr(counters, "{}_time".format(metric)): self._system_disk_time_labels["device"] = device self._system_disk_time_labels["direction"] = metric observer.observe( getattr(counters, "{}_time".format(metric)) / 1000, self._system_disk_time_labels, ) def _get_system_disk_merged(self, observer: metrics.SumObserver) -> None: """Observer callback for disk merged operations Args: observer: the observer to update """ # FIXME The units in the spec is 1, it seems like it should be # operations or the value type should be Double for device, counters in psutil.disk_io_counters(perdisk=True).items(): for metric in self._config["system.disk.time"]: if hasattr(counters, "{}_merged_count".format(metric)): self._system_disk_merged_labels["device"] = device self._system_disk_merged_labels["direction"] = metric observer.observe( getattr(counters, "{}_merged_count".format(metric)), self._system_disk_merged_labels, ) # TODO Add _get_system_filesystem_usage # TODO Add _get_system_filesystem_utilization # TODO Filesystem information can be obtained with os.statvfs in Unix-like # OSs, how to do the same in Windows? def _get_system_network_dropped_packets( self, observer: metrics.SumObserver ) -> None: """Observer callback for network dropped packets Args: observer: the observer to update """ for device, counters in psutil.net_io_counters(pernic=True).items(): for metric in self._config["system.network.dropped.packets"]: in_out = {"receive": "in", "transmit": "out"}[metric] if hasattr(counters, "drop{}".format(in_out)): self._system_network_dropped_packets_labels[ "device" ] = device self._system_network_dropped_packets_labels[ "direction" ] = metric observer.observe( getattr(counters, "drop{}".format(in_out)), self._system_network_dropped_packets_labels, ) def _get_system_network_packets( self, observer: metrics.SumObserver ) -> None: """Observer callback for network packets Args: observer: the observer to update """ for device, counters in psutil.net_io_counters(pernic=True).items(): for metric in self._config["system.network.dropped.packets"]: recv_sent = {"receive": "recv", "transmit": "sent"}[metric] if hasattr(counters, "packets_{}".format(recv_sent)): self._system_network_packets_labels["device"] = device self._system_network_packets_labels["direction"] = metric observer.observe( getattr(counters, "packets_{}".format(recv_sent)), self._system_network_packets_labels, ) def _get_system_network_errors( self, observer: metrics.SumObserver ) -> None: """Observer callback for network errors Args: observer: the observer to update """ for device, counters in psutil.net_io_counters(pernic=True).items(): for metric in self._config["system.network.errors"]: in_out = {"receive": "in", "transmit": "out"}[metric] if hasattr(counters, "err{}".format(in_out)): self._system_network_errors_labels["device"] = device self._system_network_errors_labels["direction"] = metric observer.observe( getattr(counters, "err{}".format(in_out)), self._system_network_errors_labels, ) def _get_system_network_io(self, observer: metrics.SumObserver) -> None: """Observer callback for network IO Args: observer: the observer to update """ for device, counters in psutil.net_io_counters(pernic=True).items(): for metric in self._config["system.network.dropped.packets"]: recv_sent = {"receive": "recv", "transmit": "sent"}[metric] if hasattr(counters, "bytes_{}".format(recv_sent)): self._system_network_io_labels["device"] = device self._system_network_io_labels["direction"] = metric observer.observe( getattr(counters, "bytes_{}".format(recv_sent)), self._system_network_io_labels, ) def _get_system_network_connections( self, observer: metrics.UpDownSumObserver ) -> None: """Observer callback for network connections Args: observer: the observer to update """ # TODO How to find the device identifier for a particular # connection? connection_counters = {} for net_connection in psutil.net_connections(): for metric in self._config["system.network.connections"]: self._system_network_connections_labels["protocol"] = { 1: "tcp", 2: "udp", }[net_connection.type.value] self._system_network_connections_labels[ "state" ] = net_connection.status self._system_network_connections_labels[metric] = getattr( net_connection, metric ) connection_counters_key = get_dict_as_key( self._system_network_connections_labels ) if connection_counters_key in connection_counters.keys(): connection_counters[connection_counters_key]["counter"] += 1 else: connection_counters[connection_counters_key] = { "counter": 1, "labels": self._system_network_connections_labels.copy(), } for connection_counter in connection_counters.values(): observer.observe( connection_counter["counter"], connection_counter["labels"], ) def _get_runtime_memory(self, observer: metrics.SumObserver) -> None: """Observer callback for runtime memory Args: observer: the observer to update """ proc_memory = self._proc.memory_info() for metric in self._config["runtime.memory"]: if hasattr(proc_memory, metric): self._runtime_memory_labels["type"] = metric observer.observe( getattr(proc_memory, metric), self._runtime_memory_labels, ) def _get_runtime_cpu_time(self, observer: metrics.SumObserver) -> None: """Observer callback for runtime CPU time Args: observer: the observer to update """ proc_cpu = self._proc.cpu_times() for metric in self._config["runtime.cpu.time"]: if hasattr(proc_cpu, metric): self._runtime_cpu_time_labels["type"] = metric observer.observe( getattr(proc_cpu, metric), self._runtime_cpu_time_labels, ) def _get_runtime_gc_count(self, observer: metrics.SumObserver) -> None: """Observer callback for garbage collection Args: observer: the observer to update """ for index, count in enumerate(gc.get_count()): self._runtime_gc_count_labels["count"] = str(index) observer.observe(count, self._runtime_gc_count_labels)