Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions newrelic/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2084,6 +2084,10 @@ def _process_module_builtin_defaults():
"asyncio.base_events", "newrelic.hooks.coroutines_asyncio", "instrument_asyncio_base_events"
)

_process_module_definition("asyncio.events", "newrelic.hooks.coroutines_asyncio", "instrument_asyncio_events")

_process_module_definition("asyncio.runners", "newrelic.hooks.coroutines_asyncio", "instrument_asyncio_runners")

_process_module_definition(
"langchain_core.runnables.base",
"newrelic.hooks.mlmodel_langchain",
Expand Down Expand Up @@ -2671,8 +2675,6 @@ def _process_module_builtin_defaults():
"langchain_core.callbacks.manager", "newrelic.hooks.mlmodel_langchain", "instrument_langchain_callbacks_manager"
)

_process_module_definition("asyncio.events", "newrelic.hooks.coroutines_asyncio", "instrument_asyncio_events")

_process_module_definition("asgiref.sync", "newrelic.hooks.adapter_asgiref", "instrument_asgiref_sync")

_process_module_definition(
Expand Down
59 changes: 48 additions & 11 deletions newrelic/hooks/coroutines_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,36 +16,73 @@
from newrelic.core.trace_cache import trace_cache


def remove_from_cache(task):
def remove_from_cache_callback(task):
cache = trace_cache()
cache.task_stop(task)


def propagate_task_context(task):
def wrap_create_task(task):
trace_cache().task_start(task)
task.add_done_callback(remove_from_cache)
task.add_done_callback(remove_from_cache_callback)
return task


def _bind_loop(loop, *args, **kwargs):
def _instrument_event_loop(loop):
if loop and hasattr(loop, "create_task") and not hasattr(loop.create_task, "__wrapped__"):
wrap_out_function(loop, "create_task", wrap_create_task)


def _bind_set_event_loop(loop, *args, **kwargs):
return loop


def wrap_create_task(wrapped, instance, args, kwargs):
loop = _bind_loop(*args, **kwargs)
def wrap_set_event_loop(wrapped, instance, args, kwargs):
loop = _bind_set_event_loop(*args, **kwargs)

if loop and not hasattr(loop.create_task, "__wrapped__"):
wrap_out_function(loop, "create_task", propagate_task_context)
_instrument_event_loop(loop)

return wrapped(*args, **kwargs)


def wrap__lazy_init(wrapped, instance, args, kwargs):
result = wrapped(*args, **kwargs)
# This logic can be used for uvloop, but should
# work for any valid custom loop factory.

# A custom loop_factory will be used to create
# a new event loop instance. It will then run
# the main() coroutine on this event loop. Once
# this coroutine is complete, the event loop will
# be stopped and closed.

# The new loop that is created and set as the
# running loop of the duration of the run() call.
# When the coroutine starts, it runs in the context
# that was active when run() was called. Any tasks
# created within this coroutine on this new event
# loop will inherit that context.

# Note: The loop created by loop_factory is never
# set as the global current loop for the thread,
# even while it is running.
loop = instance._loop
_instrument_event_loop(loop)

return result


def instrument_asyncio_base_events(module):
wrap_out_function(module, "BaseEventLoop.create_task", propagate_task_context)
wrap_out_function(module, "BaseEventLoop.create_task", wrap_create_task)


def instrument_asyncio_events(module):
if hasattr(module, "_BaseDefaultEventLoopPolicy"): # Python >= 3.14
wrap_function_wrapper(module, "_BaseDefaultEventLoopPolicy.set_event_loop", wrap_create_task)
wrap_function_wrapper(module, "_BaseDefaultEventLoopPolicy.set_event_loop", wrap_set_event_loop)
else: # Python <= 3.13
wrap_function_wrapper(module, "BaseDefaultEventLoopPolicy.set_event_loop", wrap_create_task)
wrap_function_wrapper(module, "BaseDefaultEventLoopPolicy.set_event_loop", wrap_set_event_loop)


# For Python >= 3.11
def instrument_asyncio_runners(module):
if hasattr(module, "Runner") and hasattr(module.Runner, "_lazy_init"):
wrap_function_wrapper(module, "Runner._lazy_init", wrap__lazy_init)
6 changes: 3 additions & 3 deletions tests/adapter_uvicorn/test_uvicorn.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ def app(request):
return request.param


@pytest.fixture
def port(app):
@pytest.fixture(params=["asyncio", "uvloop", "none"], ids=["asyncio", "uvloop", "none"])
def port(app, request):
port = get_open_port()

loops = []
Expand All @@ -72,7 +72,7 @@ def on_tick_sync():
async def on_tick():
on_tick_sync()

config = Config(app, host="127.0.0.1", port=port, loop="asyncio")
config = Config(app, host="127.0.0.1", port=port, loop=request.param)
config.callback_notify = on_tick
config.log_config = {"version": 1}
config.disable_lifespan = True
Expand Down
119 changes: 115 additions & 4 deletions tests/coroutines_asyncio/test_context_propagation.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,31 @@
import uvloop

loop_policies = (pytest.param(None, id="asyncio"), pytest.param(uvloop.EventLoopPolicy(), id="uvloop"))
uvloop_factory = (pytest.param(uvloop.new_event_loop, id="uvloop"), pytest.param(None, id="None"))
except ImportError:
loop_policies = (pytest.param(None, id="asyncio"),)
uvloop_factory = (pytest.param(None, id="None"),)


def loop_factories():
import asyncio

if sys.platform == "win32":
return (pytest.param(asyncio.ProactorEventLoop, id="asyncio.ProactorEventLoop"), *uvloop_factory)
else:
return (pytest.param(asyncio.SelectorEventLoop, id="asyncio.SelectorEventLoop"), *uvloop_factory)


@pytest.fixture(autouse=True)
def reset_event_loop():
from asyncio import set_event_loop, set_event_loop_policy
try:
from asyncio import set_event_loop, set_event_loop_policy

# Remove the loop policy to avoid side effects
set_event_loop_policy(None)
except ImportError:
from asyncio import set_event_loop

# Remove the loop policy to avoid side effects
set_event_loop_policy(None)
set_event_loop(None)


Expand Down Expand Up @@ -102,6 +117,7 @@ async def _test(asyncio, schedule, nr_enabled=True):
return trace


@pytest.mark.skipif(sys.version_info >= (3, 16), reason="loop_policy is not available")
@pytest.mark.parametrize("loop_policy", loop_policies)
@pytest.mark.parametrize("schedule", ("create_task", "ensure_future"))
@validate_transaction_metrics(
Expand Down Expand Up @@ -166,10 +182,12 @@ def handle_exception(loop, context):
memcache_trace("cmd"),
],
)
def test_two_transactions(event_loop, trace):
def test_two_transactions_with_global_event_loop(event_loop, trace):
"""
Instantiate a coroutine in one transaction and await it in
another. This should not cause any errors.
This uses the global event loop policy, which has been deprecated
since Python 3.11 and is scheduled for removal in Python 3.16.
"""
import asyncio

Expand Down Expand Up @@ -211,6 +229,99 @@ async def await_task():
event_loop.run_until_complete(asyncio.gather(afut, bfut))


@pytest.mark.skipif(sys.version_info < (3, 11), reason="asyncio.Runner is not available")
@validate_transaction_metrics("await_task", background_task=True)
@validate_transaction_metrics("create_coro", background_task=True, index=-2)
@pytest.mark.parametrize("loop_factory", loop_factories())
@pytest.mark.parametrize(
"trace",
[
function_trace(name="simple_gen"),
external_trace(library="lib", url="http://foo.com"),
database_trace("select * from foo"),
datastore_trace("lib", "foo", "bar"),
message_trace("lib", "op", "typ", "name"),
memcache_trace("cmd"),
],
)
def test_two_transactions_with_loop_factory(trace, loop_factory):
"""
Instantiate a coroutine in one transaction and await it in
another. This should not cause any errors.
Starting in Python 3.11, the asyncio.Runner class was added
as well as the loop_factory parameter. The loop_factory
parameter provides a replacement for loop policies (which
are scheduled for removal in Python 3.16).
"""
import asyncio

@trace
async def task():
pass

@background_task(name="create_coro")
async def create_coro():
return asyncio.create_task(task())

@background_task(name="await_task")
async def await_task(task_to_await):
return await task_to_await

async def _main():
_task = await create_coro()
return await await_task(_task)

with asyncio.Runner(loop_factory=loop_factory) as runner:
runner.run(_main())


@pytest.mark.skipif(sys.version_info < (3, 11), reason="loop_factory/asyncio.Runner is not available")
@pytest.mark.parametrize("loop_factory", loop_factories())
@validate_transaction_metrics(
"test_context_propagation:test_context_propagation_with_loop_factory",
background_task=True,
scoped_metrics=(("Function/waiter2", 2), ("Function/waiter3", 2)),
)
@background_task()
def test_context_propagation_with_loop_factory(loop_factory):
import asyncio

exceptions = []

def handle_exception(loop, context):
exceptions.append(context)

# Call default handler for standard logging
loop.default_exception_handler(context)

async def subtask():
with FunctionTrace(name="waiter2", terminal=True):
pass

await child()

async def _task(trace):
assert current_trace() == trace

await subtask()

trace = current_trace()

with asyncio.Runner(loop_factory=loop_factory) as runner:
assert trace == current_trace()
runner._loop.set_exception_handler(handle_exception)
runner.run(_task(trace))
runner.run(_task(trace))

# The agent should have removed all traces from the cache since
# run_until_complete has terminated (all callbacks scheduled inside the
# task have run)
assert len(trace_cache()) == 1 # Sentinel is all that remains

# # Assert that no exceptions have occurred
assert not exceptions, exceptions


# Sentinel left in cache transaction exited
async def sentinel_in_cache_txn_exited(asyncio, bg):
event = asyncio.Event()
Expand Down
8 changes: 5 additions & 3 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ envlist =
python-adapter_hypercorn-{py310,py311,py312,py313,py314}-hypercornlatest,
python-adapter_hypercorn-{py38,py39}-hypercorn{0010,0011,0012,0013},
python-adapter_mcp-{py310,py311,py312,py313,py314},
python-adapter_uvicorn-{py38,py39,py310,py311,py312,py313,py314}-uvicornlatest,
python-adapter_uvicorn-py38-uvicorn014,
python-adapter_uvicorn-{py39,py310,py311,py312,py313,py314}-uvicornlatest,
python-adapter_uvicorn-py38-uvicorn020,
python-adapter_waitress-{py38,py39,py310,py311,py312,py313,py314}-waitresslatest,
python-application_celery-{py38,py39,py310,py311,py312,py313,py314,pypy311}-celerylatest,
python-application_celery-py311-celery{0504,0503,0502},
Expand Down Expand Up @@ -239,9 +239,11 @@ deps =
adapter_hypercorn-hypercorn0010: hypercorn[h3]<0.11
adapter_hypercorn: niquests
adapter_mcp: fastmcp
adapter_uvicorn-uvicorn014: uvicorn<0.15
adapter_uvicorn-uvicorn020: uvicorn<0.21
adapter_uvicorn-uvicorn020: uvloop<0.20
adapter_uvicorn-uvicornlatest: uvicorn
adapter_uvicorn: typing-extensions
adapter_uvicorn: uvloop
adapter_waitress: WSGIProxy2
adapter_waitress-waitresslatest: waitress
agent_features: beautifulsoup4
Expand Down
Loading