Skip to content

Commit 5c62b43

Browse files
khpeetumaannamalai
andauthored
Camunda Pyzeebe Instrumentation (#1385)
* add initial pyzeebe instrumentation/tests * second attempt at tests (failing) * updated tests - 1/2 passing locally * combine function_trace client tests * fix: py agent team feedback #1 * fix: pyzeebe tests * chore: ruff formatting * fix: review updates #2 * fix: next round of updates * fix: more updates based on feedback * fix: resource parameter capture * chore: ruff lint fixes * fix: no txn tests;add resourceCount attr back --------- Co-authored-by: Uma Annamalai <uannamalai@newrelic.com>
1 parent 500a6ef commit 5c62b43

File tree

9 files changed

+501
-0
lines changed

9 files changed

+501
-0
lines changed

newrelic/config.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4126,6 +4126,8 @@ def _process_module_builtin_defaults():
41264126
"newrelic.hooks.framework_azurefunctions",
41274127
"instrument_azure_functions_worker_dispatcher",
41284128
)
4129+
_process_module_definition("pyzeebe.client.client", "newrelic.hooks.external_pyzeebe", "instrument_pyzeebe_client_client")
4130+
_process_module_definition("pyzeebe.worker.job_executor", "newrelic.hooks.external_pyzeebe", "instrument_pyzeebe_worker_job_executor")
41294131

41304132

41314133
def _process_module_entry_points():

newrelic/core/attribute.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,12 @@
100100
"response.headers.contentType",
101101
"response.status",
102102
"server.address",
103+
"zeebe.client.bpmnProcessId",
104+
"zeebe.client.messageName",
105+
"zeebe.client.correlationKey",
106+
"zeebe.client.messageId",
107+
"zeebe.client.resourceCount",
108+
"zeebe.client.resourceFile",
103109
}
104110

105111
MAX_NUM_USER_ATTRIBUTES = 128

newrelic/hooks/external_pyzeebe.py

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
# Copyright 2010 New Relic, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
16+
import logging
17+
18+
from newrelic.api.application import application_instance
19+
from newrelic.api.web_transaction import WebTransaction
20+
from newrelic.api.function_trace import FunctionTrace
21+
from newrelic.api.transaction import current_transaction
22+
from newrelic.common.object_wrapper import wrap_function_wrapper
23+
24+
_logger = logging.getLogger(__name__)
25+
26+
CLIENT_ATTRIBUTES_DEPLOY_RESOURCE_LOG_MSG = "Exception occurred in PyZeebe instrumentation: Failed to extract resource count/file for method `deploy_resource`. Report this issue to New Relic support."
27+
28+
29+
# Adds client method params as txn or span attributes
30+
def _add_client_input_attributes(method_name, trace, args, kwargs):
31+
bpmn_id = extract_agent_attribute_from_methods(args, kwargs, method_name, ("run_process", "run_process_with_result"), "bpmn_process_id", 0)
32+
if bpmn_id:
33+
trace._add_agent_attribute("zeebe.client.bpmnProcessId", bpmn_id)
34+
35+
msg_name = extract_agent_attribute_from_methods(args, kwargs, method_name, ("publish_message"), "name", 0)
36+
if msg_name:
37+
trace._add_agent_attribute("zeebe.client.messageName", msg_name)
38+
39+
correlation_key = extract_agent_attribute_from_methods(args, kwargs, method_name, ("publish_message"), "correlation_key", 1)
40+
if correlation_key:
41+
trace._add_agent_attribute("zeebe.client.correlationKey", correlation_key)
42+
43+
message_id = extract_agent_attribute_from_methods(args, kwargs, method_name, ("publish_message"), "message_id", 4)
44+
if message_id:
45+
trace._add_agent_attribute("zeebe.client.messageId", message_id)
46+
47+
resource = extract_agent_attribute_from_methods(args, {}, method_name, ("deploy_resource"), None, 0)
48+
if resource:
49+
try:
50+
trace._add_agent_attribute("zeebe.client.resourceFile", resource)
51+
trace._add_agent_attribute("zeebe.client.resourceCount", len(list(args)))
52+
except Exception:
53+
_logger.warning(CLIENT_ATTRIBUTES_DEPLOY_RESOURCE_LOG_MSG, exc_info=True)
54+
55+
56+
def extract_agent_attribute_from_methods(args, kwargs, method_name, methods, param, index):
57+
try:
58+
if method_name in methods:
59+
value = kwargs.get(param)
60+
if not value and args and len(args) > index:
61+
value = args[index]
62+
return value
63+
except Exception:
64+
_logger.warning("Exception occurred in PyZeebe instrumentation: failed to extract %s from %s. Report this issue to New Relic support.", param, method_name, exc_info=True)
65+
66+
# Async wrapper that instruments router/worker annotations`
67+
async def _nr_wrapper_execute_one_job(wrapped, instance, args, kwargs):
68+
job = args[0] if args else kwargs.get("job")
69+
process_id = getattr(job, "bpmn_process_id", None) or "UnknownProcess"
70+
task_type = getattr(job, "type", None) or "UnknownType"
71+
txn_name = f"{process_id}/{task_type}"
72+
73+
with WebTransaction(application_instance(), txn_name, group="ZeebeTask") as txn:
74+
if job is not None:
75+
if hasattr(job, "key"):
76+
txn.add_custom_attribute("zeebe.job.key", job.key)
77+
if hasattr(job, "type"):
78+
txn.add_custom_attribute("zeebe.job.type", job.type)
79+
if hasattr(job, "bpmn_process_id"):
80+
txn.add_custom_attribute("zeebe.job.bpmnProcessId", job.bpmn_process_id)
81+
if hasattr(job, "process_instance_key"):
82+
txn.add_custom_attribute("zeebe.job.processInstanceKey", job.process_instance_key)
83+
if hasattr(job, "element_id"):
84+
txn.add_custom_attribute("zeebe.job.elementId", job.element_id)
85+
86+
return await wrapped(*args, **kwargs)
87+
88+
89+
# Async wrapper that instruments a ZeebeClient method.
90+
def _nr_client_wrapper(method_name):
91+
async def _client_wrapper(wrapped, instance, args, kwargs):
92+
txn = current_transaction()
93+
if not txn:
94+
return await wrapped(*args, **kwargs)
95+
96+
with FunctionTrace(name=method_name, group="ZeebeClient") as trace:
97+
_add_client_input_attributes(method_name, trace, args, kwargs)
98+
return await wrapped(*args, **kwargs)
99+
100+
return _client_wrapper
101+
102+
103+
# Instrument JobExecutor.execute_one_job to create a background transaction per job (invoked from @router.task or @worker.task annotations)
104+
def instrument_pyzeebe_worker_job_executor(module):
105+
if hasattr(module, "JobExecutor"):
106+
wrap_function_wrapper(module, "JobExecutor.execute_one_job", _nr_wrapper_execute_one_job)
107+
108+
109+
# Instrument ZeebeClient methods to trace client calls.
110+
def instrument_pyzeebe_client_client(module):
111+
target_methods = ("run_process", "run_process_with_result", "deploy_resource", "publish_message")
112+
113+
for method_name in target_methods:
114+
if hasattr(module, "ZeebeClient"):
115+
if hasattr(module.ZeebeClient, method_name):
116+
wrap_function_wrapper(module, f"ZeebeClient.{method_name}", _nr_client_wrapper(method_name))

tests/external_pyzeebe/_mocks.py

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
# Copyright 2010 New Relic, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
16+
from types import SimpleNamespace
17+
18+
from pyzeebe.grpc_internals.zeebe_adapter import ZeebeAdapter
19+
20+
# Dummy response objects with only required fields
21+
DummyCreateProcessInstanceResponse = SimpleNamespace(process_instance_key=12345)
22+
23+
DummyCreateProcessInstanceWithResultResponse = SimpleNamespace(
24+
process_instance_key=45678, variables={"result": "success"}
25+
)
26+
27+
DummyDeployResourceResponse = SimpleNamespace(key=67890, deployments=[], tenant_id=None)
28+
29+
DummyPublishMessageResponse = SimpleNamespace(key=99999, tenant_id=None)
30+
31+
32+
# Dummy RPC stub coroutines
33+
async def dummy_create_process_instance(
34+
self, bpmn_process_id: str, variables: dict = None, version: int = -1, tenant_id: str = None # noqa: RUF013
35+
):
36+
"""Simulate ZeebeAdapter.create_process_instance"""
37+
return DummyCreateProcessInstanceResponse
38+
39+
40+
async def dummy_create_process_instance_with_result(
41+
self,
42+
bpmn_process_id: str,
43+
variables: dict = None, # noqa: RUF013
44+
version: int = -1,
45+
timeout: int = 0,
46+
variables_to_fetch=None,
47+
tenant_id: str = None, # noqa: RUF013
48+
):
49+
"""Simulate ZeebeAdapter.create_process_instance_with_result"""
50+
return DummyCreateProcessInstanceWithResultResponse
51+
52+
53+
async def dummy_deploy_resource(*resource_file_path: str, tenant_id: str = None): # noqa: RUF013
54+
"""Simulate ZeebeAdapter.deploy_resource"""
55+
# Create dummy deployment metadata for each provided resource path
56+
deployments = [
57+
SimpleNamespace(
58+
resource_name=str(path),
59+
bpmn_process_id="dummy_process",
60+
process_definition_key=123,
61+
version=1,
62+
tenant_id=tenant_id if tenant_id is not None else None,
63+
)
64+
for path in resource_file_path
65+
]
66+
# Create a dummy response with a list of deployments
67+
return SimpleNamespace(
68+
deployment_key=333333, deployments=deployments, tenant_id=tenant_id if tenant_id is not None else None
69+
)
70+
71+
72+
async def dummy_publish_message(
73+
self,
74+
name: str,
75+
correlation_key: str,
76+
variables: dict = None, # noqa: RUF013
77+
time_to_live_in_milliseconds: int = 60000,
78+
message_id: str = None, # noqa: RUF013
79+
tenant_id: str = None, # noqa: RUF013
80+
):
81+
"""Simulate ZeebeAdapter.publish_message"""
82+
# Return the dummy response (contains message key)
83+
return SimpleNamespace(key=999999, tenant_id=tenant_id if tenant_id is not None else None)
84+
85+
86+
async def dummy_complete_job(self, job_key: int, variables: dict):
87+
"""Simulate JobExecutor.complete_job"""
88+
self._last_complete = {"job_key": job_key, "variables": variables}
89+
return None
90+
91+
92+
class DummyZeebeAdapter(ZeebeAdapter):
93+
"""Simulate a ZeebeAdapter so JobExecutor can be instatiated w/o gRPC channel"""
94+
95+
def __init__(self):
96+
self.completed_job_key = None
97+
self.completed_job_vars = None
98+
99+
async def complete_job(self, job_key: int, variables: dict):
100+
self.completed_job_key = job_key
101+
self.completed_job_vars = variables
102+
return None

tests/external_pyzeebe/conftest.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
# Copyright 2010 New Relic, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from testing_support.fixture.event_loop import event_loop as loop
16+
from testing_support.fixtures import collector_agent_registration_fixture, collector_available_fixture
17+
18+
_default_settings = {
19+
"package_reporting.enabled": False, # Turn off package reporting for testing as it causes slow downs.
20+
"transaction_tracer.explain_threshold": 0.0,
21+
"transaction_tracer.transaction_threshold": 0.0,
22+
"transaction_tracer.stack_trace_threshold": 0.0,
23+
"debug.log_data_collector_payloads": True,
24+
"debug.record_transaction_failure": True,
25+
}
26+
27+
collector_agent_registration = collector_agent_registration_fixture(
28+
app_name="Python Agent Test (external_pyzeebe)", default_settings=_default_settings
29+
)

tests/external_pyzeebe/test.bpmn

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<definitions
3+
xmlns="http://www.omg.org/spec/BPMN/20100524/MODEL"
4+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
5+
xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI"
6+
xmlns:omgdc="http://www.omg.org/spec/DD/20100524/DC"
7+
xmlns:omgdi="http://www.omg.org/spec/DD/20100524/DI"
8+
targetNamespace="http://example.com/bpmn"
9+
xsi:schemaLocation="http://www.omg.org/spec/BPMN/20100524/MODEL BPMN20.xsd">
10+
11+
<!-- Define the process with a unique id and name -->
12+
<process id="dummyProcess" name="Dummy Process" isExecutable="true">
13+
<!-- Start Event -->
14+
<startEvent id="StartEvent_1" name="Start"/>
15+
16+
<!-- A simple Service Task representing work -->
17+
<serviceTask id="ServiceTask_1" name="Perform Work"/>
18+
19+
<!-- End Event -->
20+
<endEvent id="EndEvent_1" name="End"/>
21+
22+
<!-- Sequence Flows connecting Start → Service Task → End -->
23+
<sequenceFlow id="Flow_1" sourceRef="StartEvent_1" targetRef="ServiceTask_1"/>
24+
<sequenceFlow id="Flow_2" sourceRef="ServiceTask_1" targetRef="EndEvent_1"/>
25+
</process>
26+
27+
<!-- (Optional) BPMNDiagram section can be added for graphical layout, but omitted here -->
28+
</definitions>

0 commit comments

Comments
 (0)