diff --git a/CHANGELOG.md b/CHANGELOG.md index 6e333303211..a6f25210bb2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#4760](https://github.com/open-telemetry/opentelemetry-python/pull/4760)) - semantic-conventions: Bump to 1.38.0 ([#4791](https://github.com/open-telemetry/opentelemetry-python/pull/4791)) +- Prevent possible endless recursion from happening in `SimpleLogRecordProcessor.on_emit`, + ([#4799](https://github.com/open-telemetry/opentelemetry-python/pull/4799)). ## Version 1.38.0/0.59b0 (2025-10-16) diff --git a/opentelemetry-api/test-requirements.txt b/opentelemetry-api/test-requirements.txt index d13bcf6875c..360573104e6 100644 --- a/opentelemetry-api/test-requirements.txt +++ b/opentelemetry-api/test-requirements.txt @@ -12,5 +12,5 @@ wrapt==1.16.0 zipp==3.20.2 -e opentelemetry-sdk -e opentelemetry-semantic-conventions --e tests/opentelemetry-test-utils -e opentelemetry-api +-e tests/opentelemetry-test-utils diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py index e632800c8cf..48d24eede62 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py @@ -17,6 +17,7 @@ import enum import logging import sys +import traceback from os import environ, linesep from typing import IO, Callable, Optional, Sequence @@ -49,6 +50,9 @@ _logger = logging.getLogger(__name__) _logger.addFilter(DuplicateFilter()) +_propagate_false_logger = logging.getLogger(__name__ + ".propagate.false") +_propagate_false_logger.propagate = False + class LogExportResult(enum.Enum): SUCCESS = 0 @@ -118,15 +122,40 @@ def __init__(self, exporter: LogExporter): self._shutdown = False def on_emit(self, log_data: LogData): - if self._shutdown: - _logger.warning("Processor is already shutdown, ignoring call") + # Prevent entering a recursive loop. + if ( + sum( + item.name == "on_emit" + and ( + item.filename.endswith("export/__init__.py") + or item.filename.endswith( + r"export\__init__.py" + ) # backward slash on windows.. + ) + for item in traceback.extract_stack() + ) + # Recursive depth of 3 is sort of arbitrary. It's possible that an Exporter.export call + # emits a log which returns us to this function, but when we call Exporter.export again the log + # is no longer emitted and we exit this recursive loop naturally, a depth of 3 allows some + # Exporter.export recursive log calls but exits after a few. + > 3 + ): + _propagate_false_logger.warning( + "SimpleLogRecordProcessor.on_emit has entered a recursive loop. Dropping log and exiting the loop." + ) return token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True)) try: - self._exporter.export((log_data,)) - except Exception: # pylint: disable=broad-exception-caught - _logger.exception("Exception while exporting logs.") - detach(token) + if self._shutdown: + _logger.warning("Processor is already shutdown, ignoring call") + return + + try: + self._exporter.export((log_data,)) + except Exception: # pylint: disable=broad-exception-caught + _logger.exception("Exception while exporting logs.") + finally: + detach(token) def shutdown(self): self._shutdown = True diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py index cb617253698..2113af864f5 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py @@ -42,8 +42,8 @@ class DuplicateFilter(logging.Filter): """Filter that can be applied to internal `logger`'s. - Currently applied to `logger`s on the export logs path that could otherwise cause endless logging of errors or a - recursion depth exceeded issue in cases where logging itself results in an exception.""" + Currently applied to `logger`s on the export logs path to prevent endlessly logging the same log + in cases where logging itself is failing.""" def filter(self, record): current_log = ( @@ -81,6 +81,13 @@ def shutdown(self): raise NotImplementedError +_logger = logging.getLogger(__name__) +_logger.addFilter(DuplicateFilter()) +_propagate_false_logger = logging.getLogger(__name__ + ".propagate.false") +_propagate_false_logger.addFilter(DuplicateFilter()) +_propagate_false_logger.propagate = False + + class BatchProcessor(Generic[Telemetry]): """This class can be used with exporter's that implement the above Exporter interface to buffer and send telemetry in batch through @@ -111,8 +118,6 @@ def __init__( target=self.worker, daemon=True, ) - self._logger = logging.getLogger(__name__) - self._logger.addFilter(DuplicateFilter()) self._exporting = exporting self._shutdown = False @@ -189,20 +194,20 @@ def _export(self, batch_strategy: BatchExportStrategy) -> None: ] ) except Exception: # pylint: disable=broad-exception-caught - self._logger.exception( + _logger.exception( "Exception while exporting %s.", self._exporting ) detach(token) - # Do not add any logging.log statements to this function, they can be being routed back to this `emit` function, - # resulting in endless recursive calls that crash the program. - # See https://github.com/open-telemetry/opentelemetry-python/issues/4261 def emit(self, data: Telemetry) -> None: if self._shutdown: + _logger.info("Shutdown called, ignoring %s.", self._exporting) return if self._pid != os.getpid(): self._bsp_reset_once.do_once(self._at_fork_reinit) - # This will drop a log from the right side if the queue is at _max_queue_length. + if len(self._queue) == self._max_queue_size: + _logger.warning("Queue full, dropping %s.", self._exporting) + # This will drop a log from the right side if the queue is at _max_queue_size. self._queue.appendleft(data) if len(self._queue) >= self._max_export_batch_size: self._worker_awaken.set() diff --git a/opentelemetry-sdk/test-requirements.txt b/opentelemetry-sdk/test-requirements.txt index 859a2196e1a..96eb7601eed 100644 --- a/opentelemetry-sdk/test-requirements.txt +++ b/opentelemetry-sdk/test-requirements.txt @@ -11,7 +11,7 @@ tomli==2.0.1 typing_extensions==4.10.0 wrapt==1.16.0 zipp==3.19.2 --e tests/opentelemetry-test-utils -e opentelemetry-api +-e tests/opentelemetry-test-utils -e opentelemetry-semantic-conventions -e opentelemetry-sdk \ No newline at end of file diff --git a/opentelemetry-sdk/tests/logs/test_export.py b/opentelemetry-sdk/tests/logs/test_export.py index 4b8d98693c5..369edd9d263 100644 --- a/opentelemetry-sdk/tests/logs/test_export.py +++ b/opentelemetry-sdk/tests/logs/test_export.py @@ -15,10 +15,11 @@ # pylint: disable=protected-access import logging import os +import sys import time import unittest from concurrent.futures import ThreadPoolExecutor -from sys import version_info +from typing import Sequence from unittest.mock import Mock, patch from pytest import mark @@ -36,6 +37,7 @@ BatchLogRecordProcessor, ConsoleLogExporter, InMemoryLogExporter, + LogExporter, SimpleLogRecordProcessor, ) from opentelemetry.sdk.environment_variables import ( @@ -61,6 +63,46 @@ class TestSimpleLogRecordProcessor(unittest.TestCase): + @mark.skipif( + sys.version_info == (3, 13), + reason="This will fail on 3.13 due to https://github.com/python/cpython/pull/131812 which prevents recursive log messages but was later rolled back.", + ) + def test_simple_log_record_processor_doesnt_enter_recursive_loop(self): + class Exporter(LogExporter): + def shutdown(self): + pass + + def export(self, batch: Sequence[LogData]): + logger = logging.getLogger("any logger..") + logger.warning("Something happened.") + + exporter = Exporter() + logger_provider = LoggerProvider() + logger_provider.add_log_record_processor( + SimpleLogRecordProcessor(exporter) + ) + root_logger = logging.getLogger() + # Add the OTLP handler to the root logger like is done in auto instrumentation. + # This causes logs generated from within SimpleLogRecordProcessor.on_emit (such as the above log in export) + # to be sent back to SimpleLogRecordProcessor.on_emit + handler = LoggingHandler( + level=logging.DEBUG, logger_provider=logger_provider + ) + root_logger.addHandler(handler) + propagate_false_logger = logging.getLogger( + "opentelemetry.sdk._logs._internal.export.propagate.false" + ) + # This would cause a max recursion depth exceeded error.. + try: + with self.assertLogs(propagate_false_logger) as cm: + root_logger.warning("hello!") + assert ( + "SimpleLogRecordProcessor.on_emit has entered a recursive loop" + in cm.output[0] + ) + finally: + root_logger.removeHandler(handler) + def test_simple_log_record_processor_default_level(self): exporter = InMemoryLogExporter() logger_provider = LoggerProvider() @@ -383,39 +425,6 @@ def bulk_emit(num_emit): time.sleep(2) assert len(exporter.get_finished_logs()) == total_expected_logs - @mark.skipif( - version_info < (3, 10), - reason="assertNoLogs only exists in python 3.10+.", - ) - def test_logging_lib_not_invoked_in_batch_log_record_emit(self): # pylint: disable=no-self-use - # See https://github.com/open-telemetry/opentelemetry-python/issues/4261 - exporter = Mock() - processor = BatchLogRecordProcessor(exporter) - logger_provider = LoggerProvider( - resource=SDKResource.create( - { - "service.name": "shoppingcart", - "service.instance.id": "instance-12", - } - ), - ) - logger_provider.add_log_record_processor(processor) - handler = LoggingHandler( - level=logging.INFO, logger_provider=logger_provider - ) - sdk_logger = logging.getLogger("opentelemetry.sdk") - # Attach OTLP handler to SDK logger - sdk_logger.addHandler(handler) - # If `emit` calls logging.log then this test will throw a maximum recursion depth exceeded exception and fail. - try: - with self.assertNoLogs(sdk_logger, logging.NOTSET): - processor.on_emit(EMPTY_LOG) - processor.shutdown() - with self.assertNoLogs(sdk_logger, logging.NOTSET): - processor.on_emit(EMPTY_LOG) - finally: - sdk_logger.removeHandler(handler) - def test_args(self): exporter = InMemoryLogExporter() log_record_processor = BatchLogRecordProcessor(