diff --git a/newrelic/config.py b/newrelic/config.py index 21ce996f6c..c2b7b5c2d6 100644 --- a/newrelic/config.py +++ b/newrelic/config.py @@ -2084,6 +2084,10 @@ def _process_module_builtin_defaults(): "asyncio.base_events", "newrelic.hooks.coroutines_asyncio", "instrument_asyncio_base_events" ) + _process_module_definition("asyncio.events", "newrelic.hooks.coroutines_asyncio", "instrument_asyncio_events") + + _process_module_definition("asyncio.runners", "newrelic.hooks.coroutines_asyncio", "instrument_asyncio_runners") + _process_module_definition( "langchain_core.runnables.base", "newrelic.hooks.mlmodel_langchain", @@ -2671,8 +2675,6 @@ def _process_module_builtin_defaults(): "langchain_core.callbacks.manager", "newrelic.hooks.mlmodel_langchain", "instrument_langchain_callbacks_manager" ) - _process_module_definition("asyncio.events", "newrelic.hooks.coroutines_asyncio", "instrument_asyncio_events") - _process_module_definition("asgiref.sync", "newrelic.hooks.adapter_asgiref", "instrument_asgiref_sync") _process_module_definition( diff --git a/newrelic/hooks/coroutines_asyncio.py b/newrelic/hooks/coroutines_asyncio.py index 41fc776595..72bb2e7fc8 100644 --- a/newrelic/hooks/coroutines_asyncio.py +++ b/newrelic/hooks/coroutines_asyncio.py @@ -16,36 +16,73 @@ from newrelic.core.trace_cache import trace_cache -def remove_from_cache(task): +def remove_from_cache_callback(task): cache = trace_cache() cache.task_stop(task) -def propagate_task_context(task): +def wrap_create_task(task): trace_cache().task_start(task) - task.add_done_callback(remove_from_cache) + task.add_done_callback(remove_from_cache_callback) return task -def _bind_loop(loop, *args, **kwargs): +def _instrument_event_loop(loop): + if loop and hasattr(loop, "create_task") and not hasattr(loop.create_task, "__wrapped__"): + wrap_out_function(loop, "create_task", wrap_create_task) + + +def _bind_set_event_loop(loop, *args, **kwargs): return loop -def wrap_create_task(wrapped, instance, args, kwargs): - loop = _bind_loop(*args, **kwargs) +def wrap_set_event_loop(wrapped, instance, args, kwargs): + loop = _bind_set_event_loop(*args, **kwargs) - if loop and not hasattr(loop.create_task, "__wrapped__"): - wrap_out_function(loop, "create_task", propagate_task_context) + _instrument_event_loop(loop) return wrapped(*args, **kwargs) +def wrap__lazy_init(wrapped, instance, args, kwargs): + result = wrapped(*args, **kwargs) + # This logic can be used for uvloop, but should + # work for any valid custom loop factory. + + # A custom loop_factory will be used to create + # a new event loop instance. It will then run + # the main() coroutine on this event loop. Once + # this coroutine is complete, the event loop will + # be stopped and closed. + + # The new loop that is created and set as the + # running loop of the duration of the run() call. + # When the coroutine starts, it runs in the context + # that was active when run() was called. Any tasks + # created within this coroutine on this new event + # loop will inherit that context. + + # Note: The loop created by loop_factory is never + # set as the global current loop for the thread, + # even while it is running. + loop = instance._loop + _instrument_event_loop(loop) + + return result + + def instrument_asyncio_base_events(module): - wrap_out_function(module, "BaseEventLoop.create_task", propagate_task_context) + wrap_out_function(module, "BaseEventLoop.create_task", wrap_create_task) def instrument_asyncio_events(module): if hasattr(module, "_BaseDefaultEventLoopPolicy"): # Python >= 3.14 - wrap_function_wrapper(module, "_BaseDefaultEventLoopPolicy.set_event_loop", wrap_create_task) + wrap_function_wrapper(module, "_BaseDefaultEventLoopPolicy.set_event_loop", wrap_set_event_loop) else: # Python <= 3.13 - wrap_function_wrapper(module, "BaseDefaultEventLoopPolicy.set_event_loop", wrap_create_task) + wrap_function_wrapper(module, "BaseDefaultEventLoopPolicy.set_event_loop", wrap_set_event_loop) + + +# For Python >= 3.11 +def instrument_asyncio_runners(module): + if hasattr(module, "Runner") and hasattr(module.Runner, "_lazy_init"): + wrap_function_wrapper(module, "Runner._lazy_init", wrap__lazy_init) diff --git a/tests/adapter_uvicorn/test_uvicorn.py b/tests/adapter_uvicorn/test_uvicorn.py index 0084be3e46..d5db2d6ca6 100644 --- a/tests/adapter_uvicorn/test_uvicorn.py +++ b/tests/adapter_uvicorn/test_uvicorn.py @@ -56,8 +56,8 @@ def app(request): return request.param -@pytest.fixture -def port(app): +@pytest.fixture(params=["asyncio", "uvloop", "none"], ids=["asyncio", "uvloop", "none"]) +def port(app, request): port = get_open_port() loops = [] @@ -72,7 +72,7 @@ def on_tick_sync(): async def on_tick(): on_tick_sync() - config = Config(app, host="127.0.0.1", port=port, loop="asyncio") + config = Config(app, host="127.0.0.1", port=port, loop=request.param) config.callback_notify = on_tick config.log_config = {"version": 1} config.disable_lifespan = True diff --git a/tests/coroutines_asyncio/test_context_propagation.py b/tests/coroutines_asyncio/test_context_propagation.py index b338b6ec3e..eb5c358745 100644 --- a/tests/coroutines_asyncio/test_context_propagation.py +++ b/tests/coroutines_asyncio/test_context_propagation.py @@ -36,16 +36,31 @@ import uvloop loop_policies = (pytest.param(None, id="asyncio"), pytest.param(uvloop.EventLoopPolicy(), id="uvloop")) + uvloop_factory = (pytest.param(uvloop.new_event_loop, id="uvloop"), pytest.param(None, id="None")) except ImportError: loop_policies = (pytest.param(None, id="asyncio"),) + uvloop_factory = (pytest.param(None, id="None"),) + + +def loop_factories(): + import asyncio + + if sys.platform == "win32": + return (pytest.param(asyncio.ProactorEventLoop, id="asyncio.ProactorEventLoop"), *uvloop_factory) + else: + return (pytest.param(asyncio.SelectorEventLoop, id="asyncio.SelectorEventLoop"), *uvloop_factory) @pytest.fixture(autouse=True) def reset_event_loop(): - from asyncio import set_event_loop, set_event_loop_policy + try: + from asyncio import set_event_loop, set_event_loop_policy + + # Remove the loop policy to avoid side effects + set_event_loop_policy(None) + except ImportError: + from asyncio import set_event_loop - # Remove the loop policy to avoid side effects - set_event_loop_policy(None) set_event_loop(None) @@ -102,6 +117,7 @@ async def _test(asyncio, schedule, nr_enabled=True): return trace +@pytest.mark.skipif(sys.version_info >= (3, 16), reason="loop_policy is not available") @pytest.mark.parametrize("loop_policy", loop_policies) @pytest.mark.parametrize("schedule", ("create_task", "ensure_future")) @validate_transaction_metrics( @@ -166,10 +182,12 @@ def handle_exception(loop, context): memcache_trace("cmd"), ], ) -def test_two_transactions(event_loop, trace): +def test_two_transactions_with_global_event_loop(event_loop, trace): """ Instantiate a coroutine in one transaction and await it in another. This should not cause any errors. + This uses the global event loop policy, which has been deprecated + since Python 3.11 and is scheduled for removal in Python 3.16. """ import asyncio @@ -211,6 +229,99 @@ async def await_task(): event_loop.run_until_complete(asyncio.gather(afut, bfut)) +@pytest.mark.skipif(sys.version_info < (3, 11), reason="asyncio.Runner is not available") +@validate_transaction_metrics("await_task", background_task=True) +@validate_transaction_metrics("create_coro", background_task=True, index=-2) +@pytest.mark.parametrize("loop_factory", loop_factories()) +@pytest.mark.parametrize( + "trace", + [ + function_trace(name="simple_gen"), + external_trace(library="lib", url="http://foo.com"), + database_trace("select * from foo"), + datastore_trace("lib", "foo", "bar"), + message_trace("lib", "op", "typ", "name"), + memcache_trace("cmd"), + ], +) +def test_two_transactions_with_loop_factory(trace, loop_factory): + """ + Instantiate a coroutine in one transaction and await it in + another. This should not cause any errors. + Starting in Python 3.11, the asyncio.Runner class was added + as well as the loop_factory parameter. The loop_factory + parameter provides a replacement for loop policies (which + are scheduled for removal in Python 3.16). + """ + import asyncio + + @trace + async def task(): + pass + + @background_task(name="create_coro") + async def create_coro(): + return asyncio.create_task(task()) + + @background_task(name="await_task") + async def await_task(task_to_await): + return await task_to_await + + async def _main(): + _task = await create_coro() + return await await_task(_task) + + with asyncio.Runner(loop_factory=loop_factory) as runner: + runner.run(_main()) + + +@pytest.mark.skipif(sys.version_info < (3, 11), reason="loop_factory/asyncio.Runner is not available") +@pytest.mark.parametrize("loop_factory", loop_factories()) +@validate_transaction_metrics( + "test_context_propagation:test_context_propagation_with_loop_factory", + background_task=True, + scoped_metrics=(("Function/waiter2", 2), ("Function/waiter3", 2)), +) +@background_task() +def test_context_propagation_with_loop_factory(loop_factory): + import asyncio + + exceptions = [] + + def handle_exception(loop, context): + exceptions.append(context) + + # Call default handler for standard logging + loop.default_exception_handler(context) + + async def subtask(): + with FunctionTrace(name="waiter2", terminal=True): + pass + + await child() + + async def _task(trace): + assert current_trace() == trace + + await subtask() + + trace = current_trace() + + with asyncio.Runner(loop_factory=loop_factory) as runner: + assert trace == current_trace() + runner._loop.set_exception_handler(handle_exception) + runner.run(_task(trace)) + runner.run(_task(trace)) + + # The agent should have removed all traces from the cache since + # run_until_complete has terminated (all callbacks scheduled inside the + # task have run) + assert len(trace_cache()) == 1 # Sentinel is all that remains + + # # Assert that no exceptions have occurred + assert not exceptions, exceptions + + # Sentinel left in cache transaction exited async def sentinel_in_cache_txn_exited(asyncio, bg): event = asyncio.Event() diff --git a/tox.ini b/tox.ini index e27ce2ef83..98cea6ee29 100644 --- a/tox.ini +++ b/tox.ini @@ -116,8 +116,8 @@ envlist = python-adapter_hypercorn-{py310,py311,py312,py313,py314}-hypercornlatest, python-adapter_hypercorn-{py38,py39}-hypercorn{0010,0011,0012,0013}, python-adapter_mcp-{py310,py311,py312,py313,py314}, - python-adapter_uvicorn-{py38,py39,py310,py311,py312,py313,py314}-uvicornlatest, - python-adapter_uvicorn-py38-uvicorn014, + python-adapter_uvicorn-{py39,py310,py311,py312,py313,py314}-uvicornlatest, + python-adapter_uvicorn-py38-uvicorn020, python-adapter_waitress-{py38,py39,py310,py311,py312,py313,py314}-waitresslatest, python-application_celery-{py38,py39,py310,py311,py312,py313,py314,pypy311}-celerylatest, python-application_celery-py311-celery{0504,0503,0502}, @@ -239,9 +239,11 @@ deps = adapter_hypercorn-hypercorn0010: hypercorn[h3]<0.11 adapter_hypercorn: niquests adapter_mcp: fastmcp - adapter_uvicorn-uvicorn014: uvicorn<0.15 + adapter_uvicorn-uvicorn020: uvicorn<0.21 + adapter_uvicorn-uvicorn020: uvloop<0.20 adapter_uvicorn-uvicornlatest: uvicorn adapter_uvicorn: typing-extensions + adapter_uvicorn: uvloop adapter_waitress: WSGIProxy2 adapter_waitress-waitresslatest: waitress agent_features: beautifulsoup4