diff --git a/package-lock.json b/package-lock.json index 9cb35e8090..ccd86bf6f7 100644 --- a/package-lock.json +++ b/package-lock.json @@ -37768,6 +37768,7 @@ "license": "Apache-2.0", "dependencies": { "@opentelemetry/instrumentation": "^0.208.0", + "@opentelemetry/propagation-utils": "^0.31.3", "@opentelemetry/semantic-conventions": "^1.27.0", "@types/aws-lambda": "^8.10.155" }, diff --git a/packages/instrumentation-aws-lambda/package.json b/packages/instrumentation-aws-lambda/package.json index 5f41e58024..ef13650f54 100644 --- a/packages/instrumentation-aws-lambda/package.json +++ b/packages/instrumentation-aws-lambda/package.json @@ -54,6 +54,7 @@ }, "dependencies": { "@opentelemetry/instrumentation": "^0.208.0", + "@opentelemetry/propagation-utils": "^0.31.3", "@opentelemetry/semantic-conventions": "^1.27.0", "@types/aws-lambda": "^8.10.155" }, diff --git a/packages/instrumentation-aws-lambda/src/instrumentation.ts b/packages/instrumentation-aws-lambda/src/instrumentation.ts index a865b8575d..28dce22cf6 100644 --- a/packages/instrumentation-aws-lambda/src/instrumentation.ts +++ b/packages/instrumentation-aws-lambda/src/instrumentation.ts @@ -39,7 +39,15 @@ import { ROOT_CONTEXT, Attributes, } from '@opentelemetry/api'; -import { ATTR_URL_FULL } from '@opentelemetry/semantic-conventions'; +import { pubsubPropagation } from '@opentelemetry/propagation-utils'; +import { + ATTR_URL_FULL, + MESSAGINGOPERATIONVALUES_PROCESS, + SEMATTRS_MESSAGING_DESTINATION, + SEMATTRS_MESSAGING_MESSAGE_ID, + SEMATTRS_MESSAGING_OPERATION, + SEMATTRS_MESSAGING_SYSTEM, +} from '@opentelemetry/semantic-conventions'; import { ATTR_CLOUD_ACCOUNT_ID, ATTR_FAAS_COLDSTART } from './semconv'; import { ATTR_FAAS_EXECUTION, ATTR_FAAS_ID } from './semconv-obsolete'; @@ -48,6 +56,7 @@ import { Callback, Context, Handler, + SQSRecord, StreamifyHandler, } from 'aws-lambda'; @@ -65,6 +74,18 @@ const headerGetter: TextMapGetter = { }, }; +export const sqsContextGetter: TextMapGetter = { + keys(carrier): string[] { + if (carrier == null) { + return []; + } + return Object.keys(carrier); + }, + get(carrier, key: string) { + return carrier?.[key]?.stringValue || carrier?.[key]?.value; + }, +}; + export const lambdaMaxInitInMilliseconds = 10_000; export const AWS_HANDLER_STREAMING_SYMBOL = Symbol.for( 'aws.lambda.runtime.handler.streaming' @@ -294,6 +315,39 @@ export class AwsLambdaInstrumentation extends InstrumentationBase { + if (event.Records && event.Records[0].eventSource === 'aws:sqs') { + const messages = event.Records; + const queueArn = messages[0]?.eventSourceARN; + const queueName = queueArn?.split(':').pop() ?? 'unknown'; + + pubsubPropagation.patchMessagesArrayToStartProcessSpans({ + messages, + parentContext: trace.setSpan(otelContext.active(), span), + tracer: plugin.tracer, + messageToSpanDetails: (message: SQSRecord) => ({ + name: queueName, + parentContext: propagation.extract( + ROOT_CONTEXT, + message.messageAttributes || {}, + sqsContextGetter + ), + attributes: { + [SEMATTRS_MESSAGING_SYSTEM]: 'aws.sqs', + [SEMATTRS_MESSAGING_DESTINATION]: queueName, + [SEMATTRS_MESSAGING_MESSAGE_ID]: message.messageId, + [SEMATTRS_MESSAGING_OPERATION]: + MESSAGINGOPERATIONVALUES_PROCESS, + }, + }), + }); + + pubsubPropagation.patchArrayForProcessSpans( + messages, + plugin.tracer, + otelContext.active() + ); + } + // Lambda seems to pass a callback even if handler is of Promise form, so we wrap all the time before calling // the handler and see if the result is a Promise or not. In such a case, the callback is usually ignored. If // the handler happened to both call the callback and complete a returned Promise, whichever happens first will diff --git a/packages/instrumentation-aws-lambda/test/integrations/lambda-handler.test.ts b/packages/instrumentation-aws-lambda/test/integrations/lambda-handler.test.ts index 067a94e394..e7172329d8 100644 --- a/packages/instrumentation-aws-lambda/test/integrations/lambda-handler.test.ts +++ b/packages/instrumentation-aws-lambda/test/integrations/lambda-handler.test.ts @@ -56,6 +56,7 @@ import { import { AWSXRayPropagator } from '@opentelemetry/propagator-aws-xray'; import { W3CTraceContextPropagator } from '@opentelemetry/core'; import { AWSXRayLambdaPropagator } from '@opentelemetry/propagator-aws-xray-lambda'; +import { sqsContextGetter } from '../../src/instrumentation'; const memoryExporter = new InMemorySpanExporter(); @@ -1236,4 +1237,113 @@ describe('lambda handler', () => { }); }); }); + + describe('sync handler sqs propagation', () => { + it('creates process span for sqs record, with lambda invocation span as parent and span link to the producer traceId and spanId', async () => { + initializeHandler('lambda-test/sync.sqshandler'); + const producerTraceId = '1df415edd0ad7f83e573f6504381dcec'; + const producerSpanId = '83b7424a259945cb'; + const sqsEvent = { + Records: [ + { + messageAttributes: { + traceparent: { + stringValue: `00-${producerTraceId}-${producerSpanId}-01`, + dataType: 'String', + }, + }, + eventSource: 'aws:sqs', + eventSourceARN: + 'arn:aws:sqs:eu-central-1:783764587482:launch-queue', + }, + ], + }; + + await lambdaRequire('lambda-test/sync').sqshandler( + sqsEvent, + ctx, + () => {} + ); + const spans = memoryExporter.getFinishedSpans(); + + assert.strictEqual(spans.length, 2); + assert.equal( + spans[0].parentSpanContext?.traceId, + spans[1].spanContext().traceId + ); + assert.equal( + spans[0].parentSpanContext?.spanId, + spans[1].spanContext().spanId + ); + assert.equal(spans[0].links[0]?.context.traceId, producerTraceId); + assert.equal(spans[0].links[0].context.spanId, producerSpanId); + }); + }); + + describe('async handler sqs propagation', () => { + it('creates process span for sqs record, with lambda invocation span as parent and span link to the producer traceId and spanId', async () => { + initializeHandler('lambda-test/async.sqshandler'); + const producerTraceId = '1df415edd0ad7f83e573f6504381dcec'; + const producerSpanId = '83b7424a259945cb'; + const sqsEvent = { + Records: [ + { + messageAttributes: { + traceparent: { + stringValue: `00-${producerTraceId}-${producerSpanId}-01`, + dataType: 'String', + }, + }, + eventSource: 'aws:sqs', + eventSourceARN: + 'arn:aws:sqs:eu-central-1:783764587482:launch-queue', + }, + ], + }; + + await lambdaRequire('lambda-test/async').sqshandler(sqsEvent, ctx); + const spans = memoryExporter.getFinishedSpans(); + + assert.strictEqual(spans.length, 2); + assert.equal( + spans[0].parentSpanContext?.traceId, + spans[1].spanContext().traceId + ); + assert.equal( + spans[0].parentSpanContext?.spanId, + spans[1].spanContext().spanId + ); + assert.equal(spans[0].links[0]?.context.traceId, producerTraceId); + assert.equal(spans[0].links[0].context.spanId, producerSpanId); + }); + }); + + describe('sqsContextGetter', () => { + it('returns the keys for a given message attributes carrier', () => { + const carrier = { + 'x-amzn-trace-id': { + stringValue: 'dummy', + stringListValues: [], + binaryListValues: [], + dataType: 'String', + }, + traceparent: { + stringValue: 'dummy', + stringListValues: [], + binaryListValues: [], + dataType: 'String', + }, + }; + + const keys = sqsContextGetter.keys(carrier); + assert.deepEqual(keys, ['x-amzn-trace-id', 'traceparent']); + }); + + it('returns empty array for null or undefined carrier', () => { + const keysNull = sqsContextGetter.keys(null); + const keysUndefined = sqsContextGetter.keys(undefined); + assert.deepEqual(keysNull, []); + assert.deepEqual(keysUndefined, []); + }); + }); }); diff --git a/packages/instrumentation-aws-lambda/test/lambda-test/async.js b/packages/instrumentation-aws-lambda/test/lambda-test/async.js index 400fa7ed94..9e84459170 100644 --- a/packages/instrumentation-aws-lambda/test/lambda-test/async.js +++ b/packages/instrumentation-aws-lambda/test/lambda-test/async.js @@ -19,6 +19,11 @@ exports.handler = async function (event, context) { return 'ok'; }; +exports.sqshandler = async function (event, context) { + event.Records.forEach(r => {}); + return 'ok'; +}; + exports.error = async function (event, context) { throw new Error('handler error'); }; diff --git a/packages/instrumentation-aws-lambda/test/lambda-test/sync.js b/packages/instrumentation-aws-lambda/test/lambda-test/sync.js index 521c5f6ea4..a4134f6b74 100644 --- a/packages/instrumentation-aws-lambda/test/lambda-test/sync.js +++ b/packages/instrumentation-aws-lambda/test/lambda-test/sync.js @@ -19,6 +19,12 @@ exports.handler = function (event, context, callback) { callback(null, 'ok'); }; +exports.sqshandler = function (event, context, callback) { + // Dummy forEach loop, to trigger sqs instrumentation + event.Records.forEach(r => {}); + callback(null, 'ok'); +}; + exports.error = function (event, context, callback) { throw new Error('handler error'); };