From eaf22c156ac38ac7fd9156fb06854a219904a063 Mon Sep 17 00:00:00 2001 From: warre Date: Sun, 10 Aug 2025 21:19:30 +0200 Subject: [PATCH 01/11] Propagate pubsub context when lambda event is an sqs event --- .../src/instrumentation.ts | 58 ++++++++++++++++++- .../test/integrations/lambda-handler.test.ts | 42 +++++++++++++- .../test/lambda-test/sync.js | 6 ++ 3 files changed, 104 insertions(+), 2 deletions(-) diff --git a/packages/instrumentation-aws-lambda/src/instrumentation.ts b/packages/instrumentation-aws-lambda/src/instrumentation.ts index a865b8575d..5681212b12 100644 --- a/packages/instrumentation-aws-lambda/src/instrumentation.ts +++ b/packages/instrumentation-aws-lambda/src/instrumentation.ts @@ -39,7 +39,18 @@ import { ROOT_CONTEXT, Attributes, } from '@opentelemetry/api'; -import { ATTR_URL_FULL } from '@opentelemetry/semantic-conventions'; +import { pubsubPropagation } from '@opentelemetry/propagation-utils'; +import { + ATTR_URL_FULL, + MESSAGINGDESTINATIONKINDVALUES_QUEUE, + MESSAGINGOPERATIONVALUES_PROCESS, + SEMATTRS_MESSAGING_DESTINATION, + SEMATTRS_MESSAGING_DESTINATION_KIND, + SEMATTRS_MESSAGING_MESSAGE_ID, + SEMATTRS_MESSAGING_OPERATION, + SEMATTRS_MESSAGING_SYSTEM, + SEMATTRS_MESSAGING_URL, +} from '@opentelemetry/semantic-conventions'; import { ATTR_CLOUD_ACCOUNT_ID, ATTR_FAAS_COLDSTART } from './semconv'; import { ATTR_FAAS_EXECUTION, ATTR_FAAS_ID } from './semconv-obsolete'; @@ -65,6 +76,15 @@ const headerGetter: TextMapGetter = { }, }; +const sqsContextGetter = { + keys(carrier: any): string[] { + return Object.keys(carrier || {}); + }, + get(carrier: any, key: string) { + return carrier?.[key]?.stringValue; + }, +}; + export const lambdaMaxInitInMilliseconds = 10_000; export const AWS_HANDLER_STREAMING_SYMBOL = Symbol.for( 'aws.lambda.runtime.handler.streaming' @@ -294,6 +314,42 @@ export class AwsLambdaInstrumentation extends InstrumentationBase { + if (event.Records) { + const messages = event.Records; + const queueArn = messages[0]?.eventSourceARN; + const queueName = queueArn?.split(':').pop() ?? 'unknown'; + + pubsubPropagation.patchMessagesArrayToStartProcessSpans({ + messages, + parentContext: trace.setSpan(otelContext.active(), span), + tracer: plugin.tracer, + messageToSpanDetails: (message: any) => ({ + name: queueName, + parentContext: propagation.extract( + ROOT_CONTEXT, + message.messageAttributes || {}, + sqsContextGetter + ), + attributes: { + [SEMATTRS_MESSAGING_SYSTEM]: 'aws.sqs', + [SEMATTRS_MESSAGING_DESTINATION]: queueName, + [SEMATTRS_MESSAGING_DESTINATION_KIND]: + MESSAGINGDESTINATIONKINDVALUES_QUEUE, + [SEMATTRS_MESSAGING_MESSAGE_ID]: message.messageId, + [SEMATTRS_MESSAGING_URL]: queueArn, + [SEMATTRS_MESSAGING_OPERATION]: + MESSAGINGOPERATIONVALUES_PROCESS, + }, + }), + }); + + pubsubPropagation.patchArrayForProcessSpans( + messages, + plugin.tracer, + otelContext.active() + ); + } + // Lambda seems to pass a callback even if handler is of Promise form, so we wrap all the time before calling // the handler and see if the result is a Promise or not. In such a case, the callback is usually ignored. If // the handler happened to both call the callback and complete a returned Promise, whichever happens first will diff --git a/packages/instrumentation-aws-lambda/test/integrations/lambda-handler.test.ts b/packages/instrumentation-aws-lambda/test/integrations/lambda-handler.test.ts index 067a94e394..92ad328aa4 100644 --- a/packages/instrumentation-aws-lambda/test/integrations/lambda-handler.test.ts +++ b/packages/instrumentation-aws-lambda/test/integrations/lambda-handler.test.ts @@ -33,7 +33,7 @@ import { ReadableSpan, } from '@opentelemetry/sdk-trace-base'; import { NodeTracerProvider } from '@opentelemetry/sdk-trace-node'; -import { Context } from 'aws-lambda'; +import { APIGatewayProxyEvent, Context, SQSEvent } from 'aws-lambda'; import * as assert from 'assert'; import { ATTR_URL_FULL, @@ -1236,4 +1236,44 @@ describe('lambda handler', () => { }); }); }); + + describe('sqs test', () => { + it('creates process span for sqs record, with lambda invocation span as parent and span link to the producer traceId and spanId', async () => { + initializeHandler('lambda-test/sync.sqshandler'); + const producerTraceId = '1df415edd0ad7f83e573f6504381dcec'; + const producerSpanId = '83b7424a259945cb'; + const event = { + Records: [ + { + messageAttributes: { + traceparent: { + stringValue: `00-${producerTraceId}-${producerSpanId}-01`, + dataType: 'String', + }, + }, + eventSource: 'aws:sqs', + eventSourceARN: + 'arn:aws:sqs:eu-central-1:783764587482:launch-queue', + }, + ], + }; + + await lambdaRequire('lambda-test/sync').sqshandler(event, ctx, () => {}); + const spans = memoryExporter.getFinishedSpans(); + + assert.strictEqual(spans.length, 2); + + assert.equal( + spans[0].parentSpanContext?.traceId, + spans[1].spanContext().traceId + ); + assert.equal( + spans[0].parentSpanContext?.spanId, + spans[1].spanContext().spanId + ); + + assert.equal(spans[0].links[0]?.context.traceId, producerTraceId); + assert.equal(spans[0].links[0].context.spanId, producerSpanId); + }); + }); }); diff --git a/packages/instrumentation-aws-lambda/test/lambda-test/sync.js b/packages/instrumentation-aws-lambda/test/lambda-test/sync.js index 521c5f6ea4..5bd569faa9 100644 --- a/packages/instrumentation-aws-lambda/test/lambda-test/sync.js +++ b/packages/instrumentation-aws-lambda/test/lambda-test/sync.js @@ -19,6 +19,12 @@ exports.handler = function (event, context, callback) { callback(null, 'ok'); }; +exports.sqshandler = function (event, context, callback) { + // Dummy forEach loop, to trigger sqs instrumentation + event.Records.forEach(_r => {}); + callback(null, 'ok'); +}; + exports.error = function (event, context, callback) { throw new Error('handler error'); }; From 0d971939b48623a391da4de32f05a5bd6371a88d Mon Sep 17 00:00:00 2001 From: warre Date: Tue, 12 Aug 2025 19:09:36 +0200 Subject: [PATCH 02/11] Rename loop var, underscores are not used to mark unused vars elsewhere in this code --- packages/instrumentation-aws-lambda/test/lambda-test/sync.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/instrumentation-aws-lambda/test/lambda-test/sync.js b/packages/instrumentation-aws-lambda/test/lambda-test/sync.js index 5bd569faa9..a4134f6b74 100644 --- a/packages/instrumentation-aws-lambda/test/lambda-test/sync.js +++ b/packages/instrumentation-aws-lambda/test/lambda-test/sync.js @@ -21,7 +21,7 @@ exports.handler = function (event, context, callback) { exports.sqshandler = function (event, context, callback) { // Dummy forEach loop, to trigger sqs instrumentation - event.Records.forEach(_r => {}); + event.Records.forEach(r => {}); callback(null, 'ok'); }; From 5b4c13794b96ba26f2821e926bc2372258f43329 Mon Sep 17 00:00:00 2001 From: warre Date: Tue, 12 Aug 2025 19:23:43 +0200 Subject: [PATCH 03/11] Add sqs handler test for promise style async handler --- .../test/integrations/lambda-handler.test.ts | 40 ++++++++++++++++++- .../test/lambda-test/async.js | 5 +++ 2 files changed, 44 insertions(+), 1 deletion(-) diff --git a/packages/instrumentation-aws-lambda/test/integrations/lambda-handler.test.ts b/packages/instrumentation-aws-lambda/test/integrations/lambda-handler.test.ts index 92ad328aa4..0677ffe76e 100644 --- a/packages/instrumentation-aws-lambda/test/integrations/lambda-handler.test.ts +++ b/packages/instrumentation-aws-lambda/test/integrations/lambda-handler.test.ts @@ -1237,7 +1237,7 @@ describe('lambda handler', () => { }); }); - describe('sqs test', () => { + describe('sync handler sqs propagation', () => { it('creates process span for sqs record, with lambda invocation span as parent and span link to the producer traceId and spanId', async () => { initializeHandler('lambda-test/sync.sqshandler'); const producerTraceId = '1df415edd0ad7f83e573f6504381dcec'; @@ -1276,4 +1276,42 @@ describe('lambda handler', () => { assert.equal(spans[0].links[0].context.spanId, producerSpanId); }); }); + + describe('async handler sqs propagation', () => { + it('creates process span for sqs record, with lambda invocation span as parent and span link to the producer traceId and spanId', async () => { + initializeHandler('lambda-test/async.sqshandler'); + const producerTraceId = '1df415edd0ad7f83e573f6504381dcec'; + const producerSpanId = '83b7424a259945cb'; + const event = { + Records: [ + { + messageAttributes: { + traceparent: { + stringValue: `00-${producerTraceId}-${producerSpanId}-01`, + dataType: 'String', + }, + }, + eventSource: 'aws:sqs', + eventSourceARN: + 'arn:aws:sqs:eu-central-1:783764587482:launch-queue', + }, + ], + }; + + await lambdaRequire('lambda-test/async').sqshandler(event, ctx); + const spans = memoryExporter.getFinishedSpans(); + + assert.strictEqual(spans.length, 2); + assert.equal( + spans[0].parentSpanContext?.traceId, + spans[1].spanContext().traceId + ); + assert.equal( + spans[0].parentSpanContext?.spanId, + spans[1].spanContext().spanId + ); + assert.equal(spans[0].links[0]?.context.traceId, producerTraceId); + assert.equal(spans[0].links[0].context.spanId, producerSpanId); + }); + }); }); diff --git a/packages/instrumentation-aws-lambda/test/lambda-test/async.js b/packages/instrumentation-aws-lambda/test/lambda-test/async.js index 400fa7ed94..9e84459170 100644 --- a/packages/instrumentation-aws-lambda/test/lambda-test/async.js +++ b/packages/instrumentation-aws-lambda/test/lambda-test/async.js @@ -19,6 +19,11 @@ exports.handler = async function (event, context) { return 'ok'; }; +exports.sqshandler = async function (event, context) { + event.Records.forEach(r => {}); + return 'ok'; +}; + exports.error = async function (event, context) { throw new Error('handler error'); }; From ee28e447dea62324eda509d1d41ea65fc7002e0f Mon Sep 17 00:00:00 2001 From: warre Date: Tue, 12 Aug 2025 23:54:48 +0200 Subject: [PATCH 04/11] Return empty array from sqsContextGetter keys method if carrier is null. And fallback to message attribute value if stringValue is undefined --- .../src/instrumentation.ts | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/packages/instrumentation-aws-lambda/src/instrumentation.ts b/packages/instrumentation-aws-lambda/src/instrumentation.ts index 5681212b12..a685b28796 100644 --- a/packages/instrumentation-aws-lambda/src/instrumentation.ts +++ b/packages/instrumentation-aws-lambda/src/instrumentation.ts @@ -76,12 +76,15 @@ const headerGetter: TextMapGetter = { }, }; -const sqsContextGetter = { - keys(carrier: any): string[] { - return Object.keys(carrier || {}); +const sqsContextGetter: TextMapGetter = { + keys(carrier): string[] { + if (carrier == null) { + return []; + } + return Object.keys(carrier); }, - get(carrier: any, key: string) { - return carrier?.[key]?.stringValue; + get(carrier, key: string) { + return carrier?.[key]?.stringValue || carrier?.[key]?.value; }, }; From 39a8908a7499406794896ce0aa0429e74c8a66e6 Mon Sep 17 00:00:00 2001 From: warre Date: Wed, 13 Aug 2025 00:09:03 +0200 Subject: [PATCH 05/11] Type message param as SQSRecord in anonymous function used to build SpanDetails --- packages/instrumentation-aws-lambda/src/instrumentation.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/instrumentation-aws-lambda/src/instrumentation.ts b/packages/instrumentation-aws-lambda/src/instrumentation.ts index a685b28796..d48215c460 100644 --- a/packages/instrumentation-aws-lambda/src/instrumentation.ts +++ b/packages/instrumentation-aws-lambda/src/instrumentation.ts @@ -59,6 +59,7 @@ import { Callback, Context, Handler, + SQSRecord, StreamifyHandler, } from 'aws-lambda'; @@ -326,7 +327,7 @@ export class AwsLambdaInstrumentation extends InstrumentationBase ({ + messageToSpanDetails: (message: SQSRecord) => ({ name: queueName, parentContext: propagation.extract( ROOT_CONTEXT, From 2d26a05b74e52986211dff37ac259b27e7245b50 Mon Sep 17 00:00:00 2001 From: warre Date: Wed, 13 Aug 2025 00:09:26 +0200 Subject: [PATCH 06/11] Refactor and format lambda handler tests --- .../test/integrations/lambda-handler.test.ts | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/packages/instrumentation-aws-lambda/test/integrations/lambda-handler.test.ts b/packages/instrumentation-aws-lambda/test/integrations/lambda-handler.test.ts index 0677ffe76e..510880083c 100644 --- a/packages/instrumentation-aws-lambda/test/integrations/lambda-handler.test.ts +++ b/packages/instrumentation-aws-lambda/test/integrations/lambda-handler.test.ts @@ -33,7 +33,7 @@ import { ReadableSpan, } from '@opentelemetry/sdk-trace-base'; import { NodeTracerProvider } from '@opentelemetry/sdk-trace-node'; -import { APIGatewayProxyEvent, Context, SQSEvent } from 'aws-lambda'; +import { Context } from 'aws-lambda'; import * as assert from 'assert'; import { ATTR_URL_FULL, @@ -1242,7 +1242,7 @@ describe('lambda handler', () => { initializeHandler('lambda-test/sync.sqshandler'); const producerTraceId = '1df415edd0ad7f83e573f6504381dcec'; const producerSpanId = '83b7424a259945cb'; - const event = { + const sqsEvent = { Records: [ { messageAttributes: { @@ -1258,11 +1258,14 @@ describe('lambda handler', () => { ], }; - await lambdaRequire('lambda-test/sync').sqshandler(event, ctx, () => {}); + await lambdaRequire('lambda-test/sync').sqshandler( + sqsEvent, + ctx, + () => {} + ); const spans = memoryExporter.getFinishedSpans(); assert.strictEqual(spans.length, 2); - assert.equal( spans[0].parentSpanContext?.traceId, spans[1].spanContext().traceId @@ -1271,7 +1274,6 @@ describe('lambda handler', () => { spans[0].parentSpanContext?.spanId, spans[1].spanContext().spanId ); - assert.equal(spans[0].links[0]?.context.traceId, producerTraceId); assert.equal(spans[0].links[0].context.spanId, producerSpanId); }); @@ -1282,7 +1284,7 @@ describe('lambda handler', () => { initializeHandler('lambda-test/async.sqshandler'); const producerTraceId = '1df415edd0ad7f83e573f6504381dcec'; const producerSpanId = '83b7424a259945cb'; - const event = { + const sqsEvent = { Records: [ { messageAttributes: { @@ -1298,7 +1300,7 @@ describe('lambda handler', () => { ], }; - await lambdaRequire('lambda-test/async').sqshandler(event, ctx); + await lambdaRequire('lambda-test/async').sqshandler(sqsEvent, ctx); const spans = memoryExporter.getFinishedSpans(); assert.strictEqual(spans.length, 2); From ee602fc0a24dce68ce860382106e6431c607de0f Mon Sep 17 00:00:00 2001 From: warre Date: Wed, 13 Aug 2025 23:56:01 +0200 Subject: [PATCH 07/11] Add propagation-utils dependency in instrumentation-aws-lambda package --- packages/instrumentation-aws-lambda/package.json | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/instrumentation-aws-lambda/package.json b/packages/instrumentation-aws-lambda/package.json index 5f41e58024..ef13650f54 100644 --- a/packages/instrumentation-aws-lambda/package.json +++ b/packages/instrumentation-aws-lambda/package.json @@ -54,6 +54,7 @@ }, "dependencies": { "@opentelemetry/instrumentation": "^0.208.0", + "@opentelemetry/propagation-utils": "^0.31.3", "@opentelemetry/semantic-conventions": "^1.27.0", "@types/aws-lambda": "^8.10.155" }, From 543e48448b9d73acf6e6e3f08bdbd1ff556aaec8 Mon Sep 17 00:00:00 2001 From: warre Date: Sun, 17 Aug 2025 00:17:30 +0200 Subject: [PATCH 08/11] Update root level lockfile --- package-lock.json | 1 + 1 file changed, 1 insertion(+) diff --git a/package-lock.json b/package-lock.json index 9cb35e8090..ccd86bf6f7 100644 --- a/package-lock.json +++ b/package-lock.json @@ -37768,6 +37768,7 @@ "license": "Apache-2.0", "dependencies": { "@opentelemetry/instrumentation": "^0.208.0", + "@opentelemetry/propagation-utils": "^0.31.3", "@opentelemetry/semantic-conventions": "^1.27.0", "@types/aws-lambda": "^8.10.155" }, From 34426337f72e3aba8e6390a1917931e60bb7e67d Mon Sep 17 00:00:00 2001 From: warre Date: Sun, 17 Aug 2025 00:26:23 +0200 Subject: [PATCH 09/11] Look at first record eventSource in aws lambda event Records array to determine if it is an sqs event --- packages/instrumentation-aws-lambda/src/instrumentation.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/instrumentation-aws-lambda/src/instrumentation.ts b/packages/instrumentation-aws-lambda/src/instrumentation.ts index d48215c460..7903dd25ea 100644 --- a/packages/instrumentation-aws-lambda/src/instrumentation.ts +++ b/packages/instrumentation-aws-lambda/src/instrumentation.ts @@ -318,7 +318,7 @@ export class AwsLambdaInstrumentation extends InstrumentationBase { - if (event.Records) { + if (event.Records && event.Records[0].eventSource === 'aws:sqs') { const messages = event.Records; const queueArn = messages[0]?.eventSourceARN; const queueName = queueArn?.split(':').pop() ?? 'unknown'; From edb68956e81575545b83aa8e96089f25f9178e17 Mon Sep 17 00:00:00 2001 From: warre Date: Sun, 17 Aug 2025 22:59:58 +0200 Subject: [PATCH 10/11] Remove span attributes that are no longer part of the semantic conventions spec for messaging spans --- packages/instrumentation-aws-lambda/src/instrumentation.ts | 6 ------ 1 file changed, 6 deletions(-) diff --git a/packages/instrumentation-aws-lambda/src/instrumentation.ts b/packages/instrumentation-aws-lambda/src/instrumentation.ts index 7903dd25ea..5edfaf3574 100644 --- a/packages/instrumentation-aws-lambda/src/instrumentation.ts +++ b/packages/instrumentation-aws-lambda/src/instrumentation.ts @@ -42,14 +42,11 @@ import { import { pubsubPropagation } from '@opentelemetry/propagation-utils'; import { ATTR_URL_FULL, - MESSAGINGDESTINATIONKINDVALUES_QUEUE, MESSAGINGOPERATIONVALUES_PROCESS, SEMATTRS_MESSAGING_DESTINATION, - SEMATTRS_MESSAGING_DESTINATION_KIND, SEMATTRS_MESSAGING_MESSAGE_ID, SEMATTRS_MESSAGING_OPERATION, SEMATTRS_MESSAGING_SYSTEM, - SEMATTRS_MESSAGING_URL, } from '@opentelemetry/semantic-conventions'; import { ATTR_CLOUD_ACCOUNT_ID, ATTR_FAAS_COLDSTART } from './semconv'; import { ATTR_FAAS_EXECUTION, ATTR_FAAS_ID } from './semconv-obsolete'; @@ -337,10 +334,7 @@ export class AwsLambdaInstrumentation extends InstrumentationBase Date: Sun, 17 Aug 2025 23:44:33 +0200 Subject: [PATCH 11/11] Test sqsContextGetter keys method --- .../src/instrumentation.ts | 2 +- .../test/integrations/lambda-handler.test.ts | 30 +++++++++++++++++++ 2 files changed, 31 insertions(+), 1 deletion(-) diff --git a/packages/instrumentation-aws-lambda/src/instrumentation.ts b/packages/instrumentation-aws-lambda/src/instrumentation.ts index 5edfaf3574..28dce22cf6 100644 --- a/packages/instrumentation-aws-lambda/src/instrumentation.ts +++ b/packages/instrumentation-aws-lambda/src/instrumentation.ts @@ -74,7 +74,7 @@ const headerGetter: TextMapGetter = { }, }; -const sqsContextGetter: TextMapGetter = { +export const sqsContextGetter: TextMapGetter = { keys(carrier): string[] { if (carrier == null) { return []; diff --git a/packages/instrumentation-aws-lambda/test/integrations/lambda-handler.test.ts b/packages/instrumentation-aws-lambda/test/integrations/lambda-handler.test.ts index 510880083c..e7172329d8 100644 --- a/packages/instrumentation-aws-lambda/test/integrations/lambda-handler.test.ts +++ b/packages/instrumentation-aws-lambda/test/integrations/lambda-handler.test.ts @@ -56,6 +56,7 @@ import { import { AWSXRayPropagator } from '@opentelemetry/propagator-aws-xray'; import { W3CTraceContextPropagator } from '@opentelemetry/core'; import { AWSXRayLambdaPropagator } from '@opentelemetry/propagator-aws-xray-lambda'; +import { sqsContextGetter } from '../../src/instrumentation'; const memoryExporter = new InMemorySpanExporter(); @@ -1316,4 +1317,33 @@ describe('lambda handler', () => { assert.equal(spans[0].links[0].context.spanId, producerSpanId); }); }); + + describe('sqsContextGetter', () => { + it('returns the keys for a given message attributes carrier', () => { + const carrier = { + 'x-amzn-trace-id': { + stringValue: 'dummy', + stringListValues: [], + binaryListValues: [], + dataType: 'String', + }, + traceparent: { + stringValue: 'dummy', + stringListValues: [], + binaryListValues: [], + dataType: 'String', + }, + }; + + const keys = sqsContextGetter.keys(carrier); + assert.deepEqual(keys, ['x-amzn-trace-id', 'traceparent']); + }); + + it('returns empty array for null or undefined carrier', () => { + const keysNull = sqsContextGetter.keys(null); + const keysUndefined = sqsContextGetter.keys(undefined); + assert.deepEqual(keysNull, []); + assert.deepEqual(keysUndefined, []); + }); + }); });