From 779c7a9b01bccf494ceb14e822768629e4ec2d78 Mon Sep 17 00:00:00 2001 From: Lalleh Rafeei Date: Fri, 7 Nov 2025 19:30:43 -0800 Subject: [PATCH 1/7] Runner instrumentation in asyncio --- newrelic/config.py | 6 ++-- newrelic/hooks/coroutines_asyncio.py | 42 ++++++++++++++++++++++++++++ 2 files changed, 46 insertions(+), 2 deletions(-) diff --git a/newrelic/config.py b/newrelic/config.py index 21ce996f6c..6b0127df33 100644 --- a/newrelic/config.py +++ b/newrelic/config.py @@ -2083,6 +2083,10 @@ def _process_module_builtin_defaults(): _process_module_definition( "asyncio.base_events", "newrelic.hooks.coroutines_asyncio", "instrument_asyncio_base_events" ) + + _process_module_definition("asyncio.events", "newrelic.hooks.coroutines_asyncio", "instrument_asyncio_events") + + _process_module_definition("asyncio.runners", "newrelic.hooks.coroutines_asyncio", "instrument_asyncio_runners") _process_module_definition( "langchain_core.runnables.base", @@ -2671,8 +2675,6 @@ def _process_module_builtin_defaults(): "langchain_core.callbacks.manager", "newrelic.hooks.mlmodel_langchain", "instrument_langchain_callbacks_manager" ) - _process_module_definition("asyncio.events", "newrelic.hooks.coroutines_asyncio", "instrument_asyncio_events") - _process_module_definition("asgiref.sync", "newrelic.hooks.adapter_asgiref", "instrument_asgiref_sync") _process_module_definition( diff --git a/newrelic/hooks/coroutines_asyncio.py b/newrelic/hooks/coroutines_asyncio.py index 41fc776595..3ff3c6d464 100644 --- a/newrelic/hooks/coroutines_asyncio.py +++ b/newrelic/hooks/coroutines_asyncio.py @@ -40,6 +40,41 @@ def wrap_create_task(wrapped, instance, args, kwargs): return wrapped(*args, **kwargs) +def wrap__lazy_init(wrapped, instance, args, kwargs): + result = wrapped(*args, **kwargs) + # This logic can be used for uvloop, but should + # work for any valid custom loop factory. + + if not instance._set_event_loop: + # A custom loop_factory will be used to create + # a new event loop instance. It will then run + # the main() coroutine on this event loop. Once + # this coroutine is complete, the event loop will + # be stopped and closed. + + # The new loop that is created and set as the + # running loop of the duration of the run() call. + # When the coroutine starts, it runs in the context + # that was active when run() was called. Any tasks + # created within this coroutine on this new event + # loop will inherit that context. + + # Note: The loop created by loop_factory is never + # set as the global current loop for the thread, + # even while it is running. + loop = instance._loop + if loop and not hasattr(loop.create_task, "__wrapped__"): + wrap_out_function(loop, "create_task", propagate_task_context) + + # If `instance._set_event_loop` has been set, + # the event loop has been set by the default + # `set_event_loop()` method, which has been + # accounted for in our instrumentation. + + return result # Does not actually return anything + + + def instrument_asyncio_base_events(module): wrap_out_function(module, "BaseEventLoop.create_task", propagate_task_context) @@ -49,3 +84,10 @@ def instrument_asyncio_events(module): wrap_function_wrapper(module, "_BaseDefaultEventLoopPolicy.set_event_loop", wrap_create_task) else: # Python <= 3.13 wrap_function_wrapper(module, "BaseDefaultEventLoopPolicy.set_event_loop", wrap_create_task) + + +# For Python >= 3.11 +def instrument_asyncio_runners(module): + if hasattr(module, "Runner") and hasattr(module.Runner, "_lazy_init"): + wrap_function_wrapper(module, "Runner._lazy_init", wrap__lazy_init) + From c9c7cae1216a3a0012a0fa9ea543570e38132f4c Mon Sep 17 00:00:00 2001 From: Lalleh Rafeei Date: Mon, 10 Nov 2025 14:46:48 -0800 Subject: [PATCH 2/7] Clean up asyncio instrumentation --- newrelic/hooks/coroutines_asyncio.py | 72 +++++++++++++--------------- 1 file changed, 34 insertions(+), 38 deletions(-) diff --git a/newrelic/hooks/coroutines_asyncio.py b/newrelic/hooks/coroutines_asyncio.py index 3ff3c6d464..451c8d0f6a 100644 --- a/newrelic/hooks/coroutines_asyncio.py +++ b/newrelic/hooks/coroutines_asyncio.py @@ -16,26 +16,30 @@ from newrelic.core.trace_cache import trace_cache -def remove_from_cache(task): +def remove_from_cache_callback(task): cache = trace_cache() cache.task_stop(task) -def propagate_task_context(task): +def wrap_create_task(task): trace_cache().task_start(task) - task.add_done_callback(remove_from_cache) + task.add_done_callback(remove_from_cache_callback) return task -def _bind_loop(loop, *args, **kwargs): +def _instrument_event_loop(loop): + if loop and hasattr(loop, "create_task") and not hasattr(loop.create_task, "__wrapped__"): + wrap_out_function(loop, "create_task", wrap_create_task) + + +def _bind_set_event_loop(loop, *args, **kwargs): return loop -def wrap_create_task(wrapped, instance, args, kwargs): - loop = _bind_loop(*args, **kwargs) +def wrap_set_event_loop(wrapped, instance, args, kwargs): + loop = _bind_set_event_loop(*args, **kwargs) - if loop and not hasattr(loop.create_task, "__wrapped__"): - wrap_out_function(loop, "create_task", propagate_task_context) + _instrument_event_loop(loop) return wrapped(*args, **kwargs) @@ -45,45 +49,37 @@ def wrap__lazy_init(wrapped, instance, args, kwargs): # This logic can be used for uvloop, but should # work for any valid custom loop factory. - if not instance._set_event_loop: - # A custom loop_factory will be used to create - # a new event loop instance. It will then run - # the main() coroutine on this event loop. Once - # this coroutine is complete, the event loop will - # be stopped and closed. - - # The new loop that is created and set as the - # running loop of the duration of the run() call. - # When the coroutine starts, it runs in the context - # that was active when run() was called. Any tasks - # created within this coroutine on this new event - # loop will inherit that context. - - # Note: The loop created by loop_factory is never - # set as the global current loop for the thread, - # even while it is running. - loop = instance._loop - if loop and not hasattr(loop.create_task, "__wrapped__"): - wrap_out_function(loop, "create_task", propagate_task_context) - - # If `instance._set_event_loop` has been set, - # the event loop has been set by the default - # `set_event_loop()` method, which has been - # accounted for in our instrumentation. + # A custom loop_factory will be used to create + # a new event loop instance. It will then run + # the main() coroutine on this event loop. Once + # this coroutine is complete, the event loop will + # be stopped and closed. + + # The new loop that is created and set as the + # running loop of the duration of the run() call. + # When the coroutine starts, it runs in the context + # that was active when run() was called. Any tasks + # created within this coroutine on this new event + # loop will inherit that context. + + # Note: The loop created by loop_factory is never + # set as the global current loop for the thread, + # even while it is running. + loop = instance._loop + _instrument_event_loop(loop) - return result # Does not actually return anything - + return result def instrument_asyncio_base_events(module): - wrap_out_function(module, "BaseEventLoop.create_task", propagate_task_context) + wrap_out_function(module, "BaseEventLoop.create_task", wrap_create_task) def instrument_asyncio_events(module): if hasattr(module, "_BaseDefaultEventLoopPolicy"): # Python >= 3.14 - wrap_function_wrapper(module, "_BaseDefaultEventLoopPolicy.set_event_loop", wrap_create_task) + wrap_function_wrapper(module, "_BaseDefaultEventLoopPolicy.set_event_loop", wrap_set_event_loop) else: # Python <= 3.13 - wrap_function_wrapper(module, "BaseDefaultEventLoopPolicy.set_event_loop", wrap_create_task) + wrap_function_wrapper(module, "BaseDefaultEventLoopPolicy.set_event_loop", wrap_set_event_loop) # For Python >= 3.11 From 14b1df6687f01a92e43107bd66d78637965a619c Mon Sep 17 00:00:00 2001 From: Lalleh Rafeei Date: Mon, 10 Nov 2025 18:55:59 -0800 Subject: [PATCH 3/7] Add asyncio tests for loop_factory --- .../test_context_propagation.py | 131 +++++++++++++++++- 1 file changed, 127 insertions(+), 4 deletions(-) diff --git a/tests/coroutines_asyncio/test_context_propagation.py b/tests/coroutines_asyncio/test_context_propagation.py index b338b6ec3e..0933b0074f 100644 --- a/tests/coroutines_asyncio/test_context_propagation.py +++ b/tests/coroutines_asyncio/test_context_propagation.py @@ -36,16 +36,31 @@ import uvloop loop_policies = (pytest.param(None, id="asyncio"), pytest.param(uvloop.EventLoopPolicy(), id="uvloop")) + uvloop_factory = (pytest.param(uvloop.new_event_loop, id="uvloop"), pytest.param(None, id="None")) except ImportError: loop_policies = (pytest.param(None, id="asyncio"),) + uvloop_factory = (pytest.param(None, id="None"),) + + +def loop_factories(): + import asyncio + + if sys.platform == "win32": + return (pytest.param(asyncio.ProactorEventLoop, id="asyncio.ProactorEventLoop"),) + uvloop_factory + else: + return (pytest.param(asyncio.SelectorEventLoop, id="asyncio.SelectorEventLoop"),) + uvloop_factory @pytest.fixture(autouse=True) def reset_event_loop(): - from asyncio import set_event_loop, set_event_loop_policy + try: + from asyncio import set_event_loop, set_event_loop_policy + + # Remove the loop policy to avoid side effects + set_event_loop_policy(None) + except ImportError: + from asyncio import set_event_loop - # Remove the loop policy to avoid side effects - set_event_loop_policy(None) set_event_loop(None) @@ -102,6 +117,7 @@ async def _test(asyncio, schedule, nr_enabled=True): return trace +@pytest.mark.skipif(sys.version_info >= (3, 16), reason="loop_policy is not available") @pytest.mark.parametrize("loop_policy", loop_policies) @pytest.mark.parametrize("schedule", ("create_task", "ensure_future")) @validate_transaction_metrics( @@ -166,10 +182,12 @@ def handle_exception(loop, context): memcache_trace("cmd"), ], ) -def test_two_transactions(event_loop, trace): +def test_two_transactions_with_global_event_loop(event_loop, trace): """ Instantiate a coroutine in one transaction and await it in another. This should not cause any errors. + This uses the global event loop policy, which has been deprecated + since Python 3.11 and is scheduled for removal in Python 3.16. """ import asyncio @@ -211,6 +229,111 @@ async def await_task(): event_loop.run_until_complete(asyncio.gather(afut, bfut)) +@pytest.mark.skipif(sys.version_info < (3, 11), reason="asyncio.Runner is not available") +@validate_transaction_metrics( + "await_task", + background_task=True, +) +@validate_transaction_metrics( + "create_coro", + background_task=True, + index=-2, +) +@pytest.mark.parametrize("loop_factory", loop_factories()) +@pytest.mark.parametrize( + "trace", + [ + function_trace(name="simple_gen"), + external_trace(library="lib", url="http://foo.com"), + database_trace("select * from foo"), + datastore_trace("lib", "foo", "bar"), + message_trace("lib", "op", "typ", "name"), + memcache_trace("cmd"), + ], +) +def test_two_transactions_with_loop_factory(trace, loop_factory): + """ + Instantiate a coroutine in one transaction and await it in + another. This should not cause any errors. + Starting in Python 3.11, the asyncio.Runner class was added + as well as the loop_factory parameter. The loop_factory + parameter provides a replacement for loop policies (which + are scheduled for removal in Python 3.16). + """ + import asyncio + + + @trace + async def task(): + pass + + + @background_task(name="create_coro") + async def create_coro(): + return asyncio.create_task(task()) + + + @background_task(name="await_task") + async def await_task(task_to_await): + return await task_to_await + + + async def _main(): + _task = await create_coro() + return await await_task(_task) + + + with asyncio.Runner(loop_factory=loop_factory) as runner: + runner.run(_main()) + + +@pytest.mark.skipif(sys.version_info < (3, 11), reason="loop_factory/asyncio.Runner is not available") +@pytest.mark.parametrize("loop_factory", loop_factories()) +@validate_transaction_metrics( + "test_context_propagation:test_context_propagation_with_loop_factory", + background_task=True, + scoped_metrics=(("Function/waiter2", 2), ("Function/waiter3", 2)), +) +@background_task() +def test_context_propagation_with_loop_factory(loop_factory): + import asyncio + + exceptions = [] + def handle_exception(loop, context): + exceptions.append(context) + + # Call default handler for standard logging + loop.default_exception_handler(context) + + async def subtask(): + with FunctionTrace(name="waiter2", terminal=True): + pass + + await child() + + async def _task(trace): + assert current_trace() == trace + + await subtask() + + + trace = current_trace() + + with asyncio.Runner(loop_factory=loop_factory) as runner: + assert trace == current_trace() + runner._loop.set_exception_handler(handle_exception) + runner.run(_task(trace)) + runner.run(_task(trace)) + + # The agent should have removed all traces from the cache since + # run_until_complete has terminated (all callbacks scheduled inside the + # task have run) + assert len(trace_cache()) == 1 # Sentinel is all that remains + + # # Assert that no exceptions have occurred + assert not exceptions, exceptions + + # Sentinel left in cache transaction exited async def sentinel_in_cache_txn_exited(asyncio, bg): event = asyncio.Event() From 8b95a557ed0aec8bcbe96b8beb8a42cde3ffcf0b Mon Sep 17 00:00:00 2001 From: Lalleh Rafeei Date: Mon, 10 Nov 2025 18:56:36 -0800 Subject: [PATCH 4/7] Modify uvicorn test for loop_factory --- tests/adapter_uvicorn/test_uvicorn.py | 6 +++--- tox.ini | 8 +++++--- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/tests/adapter_uvicorn/test_uvicorn.py b/tests/adapter_uvicorn/test_uvicorn.py index 0084be3e46..d5db2d6ca6 100644 --- a/tests/adapter_uvicorn/test_uvicorn.py +++ b/tests/adapter_uvicorn/test_uvicorn.py @@ -56,8 +56,8 @@ def app(request): return request.param -@pytest.fixture -def port(app): +@pytest.fixture(params=["asyncio", "uvloop", "none"], ids=["asyncio", "uvloop", "none"]) +def port(app, request): port = get_open_port() loops = [] @@ -72,7 +72,7 @@ def on_tick_sync(): async def on_tick(): on_tick_sync() - config = Config(app, host="127.0.0.1", port=port, loop="asyncio") + config = Config(app, host="127.0.0.1", port=port, loop=request.param) config.callback_notify = on_tick config.log_config = {"version": 1} config.disable_lifespan = True diff --git a/tox.ini b/tox.ini index 39148b657f..f5f04d44fb 100644 --- a/tox.ini +++ b/tox.ini @@ -116,8 +116,8 @@ envlist = python-adapter_hypercorn-{py38,py39,py310,py311,py312,py313,py314}-hypercornlatest, python-adapter_hypercorn-py38-hypercorn{0010,0011,0012,0013}, python-adapter_mcp-{py310,py311,py312,py313,py314}, - python-adapter_uvicorn-{py38,py39,py310,py311,py312,py313,py314}-uvicornlatest, - python-adapter_uvicorn-py38-uvicorn014, + python-adapter_uvicorn-{py39,py310,py311,py312,py313,py314}-uvicornlatest, + python-adapter_uvicorn-py38-uvicorn020, python-adapter_waitress-{py38,py39,py310,py311,py312,py313,py314}-waitresslatest, python-application_celery-{py38,py39,py310,py311,py312,py313,py314,pypy311}-celerylatest, python-application_celery-py311-celery{0504,0503,0502}, @@ -238,9 +238,11 @@ deps = adapter_hypercorn-hypercorn0010: hypercorn[h3]<0.11 adapter_hypercorn: niquests adapter_mcp: fastmcp - adapter_uvicorn-uvicorn014: uvicorn<0.15 + adapter_uvicorn-uvicorn020: uvicorn<0.21 + adapter_uvicorn-uvicorn020: uvloop<0.20 adapter_uvicorn-uvicornlatest: uvicorn adapter_uvicorn: typing-extensions + adapter_uvicorn: uvloop adapter_waitress: WSGIProxy2 adapter_waitress-waitresslatest: waitress agent_features: beautifulsoup4 From 330ed8aab69cd665f4f7c0942d94f78a690080e5 Mon Sep 17 00:00:00 2001 From: Lalleh Rafeei Date: Tue, 11 Nov 2025 15:28:04 -0800 Subject: [PATCH 5/7] Fix linter errors --- tests/coroutines_asyncio/test_context_propagation.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/coroutines_asyncio/test_context_propagation.py b/tests/coroutines_asyncio/test_context_propagation.py index 0933b0074f..98387efb72 100644 --- a/tests/coroutines_asyncio/test_context_propagation.py +++ b/tests/coroutines_asyncio/test_context_propagation.py @@ -46,9 +46,9 @@ def loop_factories(): import asyncio if sys.platform == "win32": - return (pytest.param(asyncio.ProactorEventLoop, id="asyncio.ProactorEventLoop"),) + uvloop_factory + return (pytest.param(asyncio.ProactorEventLoop, id="asyncio.ProactorEventLoop"), *uvloop_factory) else: - return (pytest.param(asyncio.SelectorEventLoop, id="asyncio.SelectorEventLoop"),) + uvloop_factory + return (pytest.param(asyncio.SelectorEventLoop, id="asyncio.SelectorEventLoop"), *uvloop_factory) @pytest.fixture(autouse=True) From f7cd0ec098b8cfcde793e2f5af359c4fd64e7273 Mon Sep 17 00:00:00 2001 From: lrafeei <84813886+lrafeei@users.noreply.github.com> Date: Tue, 11 Nov 2025 23:29:43 +0000 Subject: [PATCH 6/7] [MegaLinter] Apply linters fixes --- newrelic/config.py | 2 +- newrelic/hooks/coroutines_asyncio.py | 5 ++- .../test_context_propagation.py | 34 ++++++------------- 3 files changed, 14 insertions(+), 27 deletions(-) diff --git a/newrelic/config.py b/newrelic/config.py index 6b0127df33..c2b7b5c2d6 100644 --- a/newrelic/config.py +++ b/newrelic/config.py @@ -2083,7 +2083,7 @@ def _process_module_builtin_defaults(): _process_module_definition( "asyncio.base_events", "newrelic.hooks.coroutines_asyncio", "instrument_asyncio_base_events" ) - + _process_module_definition("asyncio.events", "newrelic.hooks.coroutines_asyncio", "instrument_asyncio_events") _process_module_definition("asyncio.runners", "newrelic.hooks.coroutines_asyncio", "instrument_asyncio_runners") diff --git a/newrelic/hooks/coroutines_asyncio.py b/newrelic/hooks/coroutines_asyncio.py index 451c8d0f6a..72bb2e7fc8 100644 --- a/newrelic/hooks/coroutines_asyncio.py +++ b/newrelic/hooks/coroutines_asyncio.py @@ -48,7 +48,7 @@ def wrap__lazy_init(wrapped, instance, args, kwargs): result = wrapped(*args, **kwargs) # This logic can be used for uvloop, but should # work for any valid custom loop factory. - + # A custom loop_factory will be used to create # a new event loop instance. It will then run # the main() coroutine on this event loop. Once @@ -67,7 +67,7 @@ def wrap__lazy_init(wrapped, instance, args, kwargs): # even while it is running. loop = instance._loop _instrument_event_loop(loop) - + return result @@ -86,4 +86,3 @@ def instrument_asyncio_events(module): def instrument_asyncio_runners(module): if hasattr(module, "Runner") and hasattr(module.Runner, "_lazy_init"): wrap_function_wrapper(module, "Runner._lazy_init", wrap__lazy_init) - diff --git a/tests/coroutines_asyncio/test_context_propagation.py b/tests/coroutines_asyncio/test_context_propagation.py index 98387efb72..eb5c358745 100644 --- a/tests/coroutines_asyncio/test_context_propagation.py +++ b/tests/coroutines_asyncio/test_context_propagation.py @@ -44,7 +44,7 @@ def loop_factories(): import asyncio - + if sys.platform == "win32": return (pytest.param(asyncio.ProactorEventLoop, id="asyncio.ProactorEventLoop"), *uvloop_factory) else: @@ -55,7 +55,7 @@ def loop_factories(): def reset_event_loop(): try: from asyncio import set_event_loop, set_event_loop_policy - + # Remove the loop policy to avoid side effects set_event_loop_policy(None) except ImportError: @@ -230,15 +230,8 @@ async def await_task(): @pytest.mark.skipif(sys.version_info < (3, 11), reason="asyncio.Runner is not available") -@validate_transaction_metrics( - "await_task", - background_task=True, -) -@validate_transaction_metrics( - "create_coro", - background_task=True, - index=-2, -) +@validate_transaction_metrics("await_task", background_task=True) +@validate_transaction_metrics("create_coro", background_task=True, index=-2) @pytest.mark.parametrize("loop_factory", loop_factories()) @pytest.mark.parametrize( "trace", @@ -262,26 +255,21 @@ def test_two_transactions_with_loop_factory(trace, loop_factory): """ import asyncio - @trace async def task(): pass - - + @background_task(name="create_coro") async def create_coro(): return asyncio.create_task(task()) - - + @background_task(name="await_task") async def await_task(task_to_await): return await task_to_await - async def _main(): _task = await create_coro() return await await_task(_task) - with asyncio.Runner(loop_factory=loop_factory) as runner: runner.run(_main()) @@ -299,12 +287,13 @@ def test_context_propagation_with_loop_factory(loop_factory): import asyncio exceptions = [] + def handle_exception(loop, context): exceptions.append(context) - + # Call default handler for standard logging loop.default_exception_handler(context) - + async def subtask(): with FunctionTrace(name="waiter2", terminal=True): pass @@ -313,12 +302,11 @@ async def subtask(): async def _task(trace): assert current_trace() == trace - - await subtask() + await subtask() trace = current_trace() - + with asyncio.Runner(loop_factory=loop_factory) as runner: assert trace == current_trace() runner._loop.set_exception_handler(handle_exception) From 8b8547e4259831774144f2e29a2cb11ca319fb41 Mon Sep 17 00:00:00 2001 From: Tim Pansino Date: Mon, 17 Nov 2025 16:41:19 -0800 Subject: [PATCH 7/7] Apply suggestions from code review --- newrelic/hooks/coroutines_asyncio.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/newrelic/hooks/coroutines_asyncio.py b/newrelic/hooks/coroutines_asyncio.py index 72bb2e7fc8..6f862d52dd 100644 --- a/newrelic/hooks/coroutines_asyncio.py +++ b/newrelic/hooks/coroutines_asyncio.py @@ -78,7 +78,7 @@ def instrument_asyncio_base_events(module): def instrument_asyncio_events(module): if hasattr(module, "_BaseDefaultEventLoopPolicy"): # Python >= 3.14 wrap_function_wrapper(module, "_BaseDefaultEventLoopPolicy.set_event_loop", wrap_set_event_loop) - else: # Python <= 3.13 + elif hasattr(module, "BaseDefaultEventLoopPolicy"): # Python <= 3.13 wrap_function_wrapper(module, "BaseDefaultEventLoopPolicy.set_event_loop", wrap_set_event_loop)