From 0b688aae8c5a9dc4f693465ee0f8733e7bcfb557 Mon Sep 17 00:00:00 2001 From: Amin Alaee Date: Mon, 22 Dec 2025 11:28:23 +0000 Subject: [PATCH 1/2] Add SQS data source attributes --- aws_lambda_opentelemetry/trace/helpers.py | 4 +- aws_lambda_opentelemetry/utils.py | 169 +++++++++++++++++----- tests/test_utils.py | 76 ++++++---- 3 files changed, 185 insertions(+), 64 deletions(-) diff --git a/aws_lambda_opentelemetry/trace/helpers.py b/aws_lambda_opentelemetry/trace/helpers.py index 80de448..ee84cbc 100644 --- a/aws_lambda_opentelemetry/trace/helpers.py +++ b/aws_lambda_opentelemetry/trace/helpers.py @@ -10,7 +10,7 @@ ) from aws_lambda_opentelemetry.typing.context import LambdaContext -from aws_lambda_opentelemetry.utils import set_lambda_handler_attributes +from aws_lambda_opentelemetry.utils import set_handler_attributes def instrument_handler(**kwargs): @@ -54,7 +54,7 @@ def wrapper(event: dict, context: LambdaContext): span.record_exception(exc) raise finally: - set_lambda_handler_attributes(event, context, span) + set_handler_attributes(event, context, span) finally: provider.force_flush() diff --git a/aws_lambda_opentelemetry/utils.py b/aws_lambda_opentelemetry/utils.py index 593d5a6..d652b71 100644 --- a/aws_lambda_opentelemetry/utils.py +++ b/aws_lambda_opentelemetry/utils.py @@ -1,3 +1,4 @@ +import enum import os from opentelemetry.semconv._incubating.attributes.cloud_attributes import ( @@ -15,6 +16,34 @@ FaasInvokedProviderValues, FaasTriggerValues, ) +from opentelemetry.semconv._incubating.attributes.messaging_attributes import ( + MESSAGING_BATCH_MESSAGE_COUNT, + MESSAGING_DESTINATION_NAME, + MESSAGING_OPERATION, + MESSAGING_SYSTEM, + MessagingOperationTypeValues, +) + +# from opentelemetry.semconv.attributes.http_attributes import ( +# HTTP_REQUEST_METHOD, +# HTTP_ROUTE, +# ) +# from opentelemetry.semconv.attributes.server_attributes import ( +# SERVER_ADDRESS, +# ) +# from opentelemetry.semconv.attributes.url_attributes import ( +# URL_PATH, +# URL_SCHEME, +# ) +# from opentelemetry.semconv.attributes.user_agent_attributes import ( +# USER_AGENT_ORIGINAL, +# ) +# from opentelemetry.semconv.attributes.client_attributes import ( +# CLIENT_ADDRESS, +# ) +# from opentelemetry.semconv.attributes.network_attributes import ( +# NETWORK_PROTOCOL_VERSION, +# ) from opentelemetry.trace import Span from aws_lambda_opentelemetry import constants @@ -23,11 +52,28 @@ _is_cold_start = True -def set_lambda_handler_attributes(event: dict, context: LambdaContext, span: Span): +class AwsDataSource(enum.Enum): + API_GATEWAY = "aws.api_gateway" + HTTP_API = "aws.http_api" + ELB = "aws.elb" + SQS = "aws.sqs" + SNS = "aws.sns" + S3 = "aws.s3" + DYNAMODB = "aws.dynamodb" + KINESIS = "aws.kinesis" + EVENT_BRIDGE = "aws.event_bridge" + CLOUDWATCH_LOGS = "aws.cloudwatch_logs" + OTHER = "aws.other" + + +def set_handler_attributes(event: dict, context: LambdaContext, span: Span): """ Set standard AWS Lambda attributes on the given span. """ + data_source_mapper = DataSourceAttributeMapper(event) + + span.set_attributes(data_source_mapper.attributes) span.set_attributes( { FAAS_INVOCATION_ID: context.aws_request_id, @@ -37,45 +83,98 @@ def set_lambda_handler_attributes(event: dict, context: LambdaContext, span: Spa FAAS_MAX_MEMORY: context.memory_limit_in_mb, FAAS_VERSION: context.function_version, FAAS_COLDSTART: _check_cold_start(), - FAAS_TRIGGER: get_lambda_datasource(event).value, + FAAS_TRIGGER: data_source_mapper.faas_trigger.value, CLOUD_RESOURCE_ID: context.invoked_function_arn, } ) -def get_lambda_datasource(event: dict) -> FaasTriggerValues: - """ - Extract the data source from the Lambda event. - """ - - # HTTP triggers - http_keys = ["apiId", "http", "elb"] - if "requestContext" in event: - if any(key in event["requestContext"] for key in http_keys): - return FaasTriggerValues.HTTP - - # EventBridge - if "source" in event and "detail-type" in event: - if event["detail-type"] == "Scheduled Event": - return FaasTriggerValues.TIMER - return FaasTriggerValues.PUBSUB - - # SNS/SQS/S3/DynamoDB/Kinesis - if "Records" in event and len(event["Records"]) > 0: - record = event["Records"][0] - event_source = record.get("eventSource") - - if event_source in {"aws:sns", "aws:sqs"}: - return FaasTriggerValues.PUBSUB - - if event_source in {"aws:s3", "aws:dynamodb", "aws:kinesis"}: - return FaasTriggerValues.DATASOURCE - - # CloudWatch Logs - if "awslogs" in event and "data" in event["awslogs"]: - return FaasTriggerValues.DATASOURCE - - return FaasTriggerValues.OTHER +class DataSourceAttributeMapper: + def __init__(self, event: dict): + self.event = event + self.data_source, self.faas_trigger = self.get_sources() + + @property + def attributes(self) -> dict: + if self.data_source == AwsDataSource.SQS: + return self._get_sqs_attributes() + # if self.data_source == AwsDataSource.SNS: + # ... + # if self.data_source == AwsDataSource.S3: + # ... + # if self.data_source == AwsDataSource.DYNAMODB: + # ... + # if self.data_source == AwsDataSource.KINESIS: + # ... + # if self.data_source == AwsDataSource.EVENT_BRIDGE: + # ... + # if self.data_source == AwsDataSource.CLOUDWATCH_LOGS: + # ... + # if self.data_source == AwsDataSource.API_GATEWAY: + # ... + # if self.data_source == AwsDataSource.HTTP_API: + # ... + # if self.data_source == AwsDataSource.ELB: + # ... + return {} + + def get_sources(self) -> tuple[AwsDataSource, FaasTriggerValues]: + # HTTP triggers + if "requestContext" in self.event: + if "apiId" in self.event["requestContext"]: + return (AwsDataSource.API_GATEWAY, FaasTriggerValues.HTTP) + + if "http" in self.event["requestContext"]: + return (AwsDataSource.HTTP_API, FaasTriggerValues.HTTP) + + if "elb" in self.event["requestContext"]: + return (AwsDataSource.ELB, FaasTriggerValues.HTTP) + + # EventBridge + if "source" in self.event and "detail-type" in self.event: + if self.event["detail-type"] == "Scheduled Event": + return (AwsDataSource.EVENT_BRIDGE, FaasTriggerValues.TIMER) + return (AwsDataSource.EVENT_BRIDGE, FaasTriggerValues.PUBSUB) + + # SNS/SQS/S3/DynamoDB/Kinesis + if "Records" in self.event and len(self.event["Records"]) > 0: + record = self.event["Records"][0] + event_source = record.get("eventSource") + + if event_source == "aws:sns": + return (AwsDataSource.SNS, FaasTriggerValues.PUBSUB) + + if event_source == "aws:sqs": + return (AwsDataSource.SQS, FaasTriggerValues.PUBSUB) + + if event_source == "aws:s3": + return (AwsDataSource.S3, FaasTriggerValues.DATASOURCE) + + if event_source == "aws:dynamodb": + return (AwsDataSource.DYNAMODB, FaasTriggerValues.DATASOURCE) + + if event_source == "aws:kinesis": + return (AwsDataSource.KINESIS, FaasTriggerValues.DATASOURCE) + + # CloudWatch Logs + if "awslogs" in self.event and "data" in self.event["awslogs"]: + return (AwsDataSource.CLOUDWATCH_LOGS, FaasTriggerValues.DATASOURCE) + + return (AwsDataSource.OTHER, FaasTriggerValues.OTHER) + + def _get_sqs_attributes(self) -> dict: + records = self.event.get("Records", []) + message_count = len(records) + queue_arn = records[0].get("eventSourceARN", "") if message_count > 0 else "" + queue_name = queue_arn.split(":")[-1] + + return { + MESSAGING_SYSTEM: self.data_source.value, + MESSAGING_OPERATION: MessagingOperationTypeValues.RECEIVE.value, + MESSAGING_BATCH_MESSAGE_COUNT: message_count, + MESSAGING_DESTINATION_NAME: queue_name, + CLOUD_RESOURCE_ID: queue_arn, + } def _check_cold_start() -> bool: diff --git a/tests/test_utils.py b/tests/test_utils.py index c5541dd..4c1dd53 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -3,15 +3,6 @@ import pytest from opentelemetry.sdk.trace import Span from opentelemetry.semconv._incubating.attributes.faas_attributes import ( - # FAAS_COLDSTART, - # FAAS_INVOCATION_ID, - # FAAS_INVOKED_NAME, - # FAAS_INVOKED_PROVIDER, - # FAAS_INVOKED_REGION, - # FAAS_MAX_MEMORY, - # FAAS_TRIGGER, - # FAAS_VERSION, - # FaasInvokedProviderValues, FaasTriggerValues, ) @@ -43,16 +34,23 @@ def test_cold_start_provisioned_concurrency(self, monkeypatch): class TestLambdaDataSource: @pytest.mark.parametrize( - "key", - [("apiId",), ("httpMethod",), ("elb",)], + "key,aws_data_source", + [ + ("apiId", utils.AwsDataSource.API_GATEWAY), + ("http", utils.AwsDataSource.HTTP_API), + ("elb", utils.AwsDataSource.ELB), + ], ) - def test_http_trigger(self, key: str): + def test_http_trigger(self, key: str, aws_data_source: utils.AwsDataSource): event = { "requestContext": { - "apiId": "example-api-id", + key: "example-api-id", } } - assert utils.get_lambda_datasource(event) == utils.FaasTriggerValues.HTTP + + mapper = utils.DataSourceAttributeMapper(event) + assert mapper.faas_trigger == utils.FaasTriggerValues.HTTP + assert mapper.data_source == aws_data_source @pytest.mark.parametrize( "detail_type, expected", @@ -66,19 +64,35 @@ def test_eventbridge_trigger(self, detail_type: str, expected: FaasTriggerValues "source": "aws.events", "detail-type": detail_type, } - assert utils.get_lambda_datasource(event) == expected + + mapper = utils.DataSourceAttributeMapper(event) + assert mapper.faas_trigger == expected + assert mapper.data_source == utils.AwsDataSource.EVENT_BRIDGE @pytest.mark.parametrize( - "event_source, expected", + "event_source, aws_data_source, faas_trigger", [ - ("aws:sns", utils.FaasTriggerValues.PUBSUB), - ("aws:sqs", utils.FaasTriggerValues.PUBSUB), - ("aws:s3", utils.FaasTriggerValues.DATASOURCE), - ("aws:dynamodb", utils.FaasTriggerValues.DATASOURCE), - ("aws:kinesis", utils.FaasTriggerValues.DATASOURCE), + ("aws:sns", utils.AwsDataSource.SNS, utils.FaasTriggerValues.PUBSUB), + ("aws:sqs", utils.AwsDataSource.SQS, utils.FaasTriggerValues.PUBSUB), + ("aws:s3", utils.AwsDataSource.S3, utils.FaasTriggerValues.DATASOURCE), + ( + "aws:dynamodb", + utils.AwsDataSource.DYNAMODB, + utils.FaasTriggerValues.DATASOURCE, + ), + ( + "aws:kinesis", + utils.AwsDataSource.KINESIS, + utils.FaasTriggerValues.DATASOURCE, + ), ], ) - def test_pubsub_trigger(self, event_source: str, expected: FaasTriggerValues): + def test_pubsub_trigger( + self, + event_source: str, + aws_data_source: utils.AwsDataSource, + faas_trigger: FaasTriggerValues, + ): event = { "Records": [ { @@ -86,7 +100,10 @@ def test_pubsub_trigger(self, event_source: str, expected: FaasTriggerValues): } ] } - assert utils.get_lambda_datasource(event) == expected + + mapper = utils.DataSourceAttributeMapper(event) + assert mapper.faas_trigger == faas_trigger + assert mapper.data_source == aws_data_source def test_cloudwatch_logs_trigger(self): event = { @@ -94,20 +111,25 @@ def test_cloudwatch_logs_trigger(self): "data": "example-data", } } - assert utils.get_lambda_datasource(event) == utils.FaasTriggerValues.DATASOURCE + + mapper = utils.DataSourceAttributeMapper(event) + assert mapper.faas_trigger == utils.FaasTriggerValues.DATASOURCE + assert mapper.data_source == utils.AwsDataSource.CLOUDWATCH_LOGS def test_unknown_trigger(self): event = {} - assert utils.get_lambda_datasource(event) == utils.FaasTriggerValues.OTHER + + mapper = utils.DataSourceAttributeMapper(event) + assert mapper.faas_trigger == utils.FaasTriggerValues.OTHER + assert mapper.data_source == utils.AwsDataSource.OTHER class TestSetLambdaHandlerAttributes: def test_set_attributes(self, lambda_context: LambdaContext): span = MagicMock(spec=Span) - utils.set_lambda_handler_attributes({}, lambda_context, span) + utils.set_handler_attributes({}, lambda_context, span) - span.set_attributes.assert_called_once() attributes = span.set_attributes.call_args[0][0] assert attributes["faas.invocation_id"] == lambda_context.aws_request_id assert attributes["faas.invoked_name"] == lambda_context.function_name From 6fff0a11e88403b543b300de5f6500d27f073ffe Mon Sep 17 00:00:00 2001 From: Amin Alaee Date: Tue, 23 Dec 2025 09:28:51 +0000 Subject: [PATCH 2/2] update --- aws_lambda_opentelemetry/utils.py | 39 ------------------------------- tests/test_utils.py | 28 ++++++++++++++++++++-- 2 files changed, 26 insertions(+), 41 deletions(-) diff --git a/aws_lambda_opentelemetry/utils.py b/aws_lambda_opentelemetry/utils.py index d652b71..c18a5fd 100644 --- a/aws_lambda_opentelemetry/utils.py +++ b/aws_lambda_opentelemetry/utils.py @@ -23,27 +23,6 @@ MESSAGING_SYSTEM, MessagingOperationTypeValues, ) - -# from opentelemetry.semconv.attributes.http_attributes import ( -# HTTP_REQUEST_METHOD, -# HTTP_ROUTE, -# ) -# from opentelemetry.semconv.attributes.server_attributes import ( -# SERVER_ADDRESS, -# ) -# from opentelemetry.semconv.attributes.url_attributes import ( -# URL_PATH, -# URL_SCHEME, -# ) -# from opentelemetry.semconv.attributes.user_agent_attributes import ( -# USER_AGENT_ORIGINAL, -# ) -# from opentelemetry.semconv.attributes.client_attributes import ( -# CLIENT_ADDRESS, -# ) -# from opentelemetry.semconv.attributes.network_attributes import ( -# NETWORK_PROTOCOL_VERSION, -# ) from opentelemetry.trace import Span from aws_lambda_opentelemetry import constants @@ -98,24 +77,6 @@ def __init__(self, event: dict): def attributes(self) -> dict: if self.data_source == AwsDataSource.SQS: return self._get_sqs_attributes() - # if self.data_source == AwsDataSource.SNS: - # ... - # if self.data_source == AwsDataSource.S3: - # ... - # if self.data_source == AwsDataSource.DYNAMODB: - # ... - # if self.data_source == AwsDataSource.KINESIS: - # ... - # if self.data_source == AwsDataSource.EVENT_BRIDGE: - # ... - # if self.data_source == AwsDataSource.CLOUDWATCH_LOGS: - # ... - # if self.data_source == AwsDataSource.API_GATEWAY: - # ... - # if self.data_source == AwsDataSource.HTTP_API: - # ... - # if self.data_source == AwsDataSource.ELB: - # ... return {} def get_sources(self) -> tuple[AwsDataSource, FaasTriggerValues]: diff --git a/tests/test_utils.py b/tests/test_utils.py index 4c1dd53..dfeadb1 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -125,12 +125,12 @@ def test_unknown_trigger(self): class TestSetLambdaHandlerAttributes: - def test_set_attributes(self, lambda_context: LambdaContext): + def test_set_general_attributes(self, lambda_context: LambdaContext): span = MagicMock(spec=Span) utils.set_handler_attributes({}, lambda_context, span) - attributes = span.set_attributes.call_args[0][0] + attributes = span.set_attributes.call_args_list[1][0][0] assert attributes["faas.invocation_id"] == lambda_context.aws_request_id assert attributes["faas.invoked_name"] == lambda_context.function_name assert attributes["faas.invoked_region"] == lambda_context.region @@ -140,3 +140,27 @@ def test_set_attributes(self, lambda_context: LambdaContext): assert attributes["faas.coldstart"] is False assert attributes["faas.trigger"] == "other" assert attributes["cloud.resource_id"] == lambda_context.invoked_function_arn + + def test_sqs_attributes_set(self, lambda_context: LambdaContext): + span = MagicMock(spec=Span) + + event = { + "Records": [ + { + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:queue", + "awsRegion": "us-east-1", + } + ] + } + + utils.set_handler_attributes(event, lambda_context, span) + + attributes = span.set_attributes.call_args_list[0][0][0] + assert attributes["messaging.system"] == "aws.sqs" + assert attributes["messaging.destination.name"] == "queue" + assert attributes["messaging.operation"] == "receive" + assert ( + attributes["cloud.resource_id"] + == "arn:aws:sqs:us-east-1:123456789012:queue" + )