diff --git a/collector/internal/confmap/converter/decoupleafterbatchconverter/converter.go b/collector/internal/confmap/converter/decoupleafterbatchconverter/converter.go index 4079ff994e..00736593b9 100644 --- a/collector/internal/confmap/converter/decoupleafterbatchconverter/converter.go +++ b/collector/internal/confmap/converter/decoupleafterbatchconverter/converter.go @@ -24,6 +24,7 @@ import ( "fmt" "strings" + "github.com/open-telemetry/opentelemetry-lambda/collector/lambdalifecycle" "go.opentelemetry.io/collector/confmap" ) @@ -43,6 +44,14 @@ func New() confmap.Converter { } func (c converter) Convert(_ context.Context, conf *confmap.Conf) error { + initType := lambdalifecycle.InitTypeFromEnv(lambdalifecycle.InitTypeEnvVar) + + // Do not append decouple processors for Lambda Managed Instances + // see: https://docs.aws.amazon.com/lambda/latest/dg/lambda-managed-instances-execution-environment.html + if initType == lambdalifecycle.LambdaManagedInstances { + return nil + } + serviceVal := conf.Get(serviceKey) service, ok := serviceVal.(map[string]interface{}) if !ok { diff --git a/collector/internal/confmap/converter/decoupleafterbatchconverter/converter_test.go b/collector/internal/confmap/converter/decoupleafterbatchconverter/converter_test.go index 26ccd2c277..967f3a5945 100644 --- a/collector/internal/confmap/converter/decoupleafterbatchconverter/converter_test.go +++ b/collector/internal/confmap/converter/decoupleafterbatchconverter/converter_test.go @@ -17,6 +17,7 @@ import ( "context" "testing" + "github.com/open-telemetry/opentelemetry-lambda/collector/lambdalifecycle" "go.opentelemetry.io/collector/confmap" "github.com/google/go-cmp/cmp" @@ -151,3 +152,38 @@ func TestConvert(t *testing.T) { }) } } + +func TestConvert_LambdaManagedInstances(t *testing.T) { + t.Setenv(lambdalifecycle.InitTypeEnvVar, lambdalifecycle.LambdaManagedInstances.String()) + + // Config that would normally have decouple appended + input := confmap.NewFromStringMap(map[string]interface{}{ + "service": map[string]interface{}{ + "pipelines": map[string]interface{}{ + "traces": map[string]interface{}{ + "processors": []interface{}{"batch"}, + }, + }, + }, + }) + + // Expected to remain unchanged + expected := confmap.NewFromStringMap(map[string]interface{}{ + "service": map[string]interface{}{ + "pipelines": map[string]interface{}{ + "traces": map[string]interface{}{ + "processors": []interface{}{"batch"}, + }, + }, + }, + }) + + c := New() + err := c.Convert(context.Background(), input) + if err != nil { + t.Errorf("unexpected error converting: %v", err) + } + if diff := cmp.Diff(expected.ToStringMap(), input.ToStringMap()); diff != "" { + t.Errorf("Convert() mismatch: (-want +got):\n%s", diff) + } +} diff --git a/collector/internal/extensionapi/client.go b/collector/internal/extensionapi/client.go index 7210a07efa..8403eb4955 100644 --- a/collector/internal/extensionapi/client.go +++ b/collector/internal/extensionapi/client.go @@ -76,15 +76,17 @@ type Client struct { httpClient *http.Client extensionID string logger *zap.Logger + events []EventType } // NewClient returns a Lambda Extensions API client. -func NewClient(logger *zap.Logger, awsLambdaRuntimeAPI string) *Client { +func NewClient(logger *zap.Logger, awsLambdaRuntimeAPI string, events []EventType) *Client { baseURL := fmt.Sprintf("http://%s/2020-01-01/extension", awsLambdaRuntimeAPI) return &Client{ baseURL: baseURL, httpClient: &http.Client{}, logger: logger.Named("extensionAPI.Client"), + events: events, } } @@ -94,7 +96,7 @@ func (e *Client) Register(ctx context.Context, filename string) (*RegisterRespon url := e.baseURL + action reqBody, err := json.Marshal(map[string]interface{}{ - "events": []EventType{Invoke, Shutdown}, + "events": e.events, }) if err != nil { return nil, err diff --git a/collector/internal/lifecycle/constants.go b/collector/internal/lifecycle/constants.go new file mode 100644 index 0000000000..506c28e7b2 --- /dev/null +++ b/collector/internal/lifecycle/constants.go @@ -0,0 +1,17 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package lifecycle + +const RuntimeApiEnvVar = "AWS_LAMBDA_RUNTIME_API" diff --git a/collector/internal/lifecycle/manager.go b/collector/internal/lifecycle/manager.go index 052c45f671..ddd90820bf 100644 --- a/collector/internal/lifecycle/manager.go +++ b/collector/internal/lifecycle/manager.go @@ -17,13 +17,14 @@ package lifecycle import ( "context" "fmt" - "github.com/open-telemetry/opentelemetry-lambda/collector/lambdalifecycle" "os" "os/signal" "path/filepath" "sync" "syscall" + "github.com/open-telemetry/opentelemetry-lambda/collector/lambdalifecycle" + "go.uber.org/multierr" "go.uber.org/zap" @@ -49,6 +50,7 @@ type manager struct { listener *telemetryapi.Listener wg sync.WaitGroup lifecycleListeners []lambdalifecycle.Listener + initType lambdalifecycle.InitType } func NewManager(ctx context.Context, logger *zap.Logger, version string) (context.Context, *manager) { @@ -62,28 +64,40 @@ func NewManager(ctx context.Context, logger *zap.Logger, version string) (contex logger.Info("received signal", zap.String("signal", s.String())) }() - extensionClient := extensionapi.NewClient(logger, os.Getenv("AWS_LAMBDA_RUNTIME_API")) + var extensionEvents []extensionapi.EventType + initType := lambdalifecycle.InitTypeFromEnv(lambdalifecycle.InitTypeEnvVar) + if initType == lambdalifecycle.LambdaManagedInstances { + extensionEvents = []extensionapi.EventType{extensionapi.Shutdown} + } else { + extensionEvents = []extensionapi.EventType{extensionapi.Invoke, extensionapi.Shutdown} + } + + extensionClient := extensionapi.NewClient(logger, os.Getenv(RuntimeApiEnvVar), extensionEvents) res, err := extensionClient.Register(ctx, extensionName) if err != nil { logger.Fatal("Cannot register extension", zap.Error(err)) } - listener := telemetryapi.NewListener(logger) - addr, err := listener.Start() - if err != nil { - logger.Fatal("Cannot start Telemetry API Listener", zap.Error(err)) - } + var listener *telemetryapi.Listener + if initType != lambdalifecycle.LambdaManagedInstances { + listener = telemetryapi.NewListener(logger) + addr, err := listener.Start() + if err != nil { + logger.Fatal("Cannot start Telemetry API Listener", zap.Error(err)) + } - telemetryClient := telemetryapi.NewClient(logger) - _, err = telemetryClient.Subscribe(ctx, []telemetryapi.EventType{telemetryapi.Platform}, res.ExtensionID, addr) - if err != nil { - logger.Fatal("Cannot register Telemetry API client", zap.Error(err)) + telemetryClient := telemetryapi.NewClient(logger) + _, err = telemetryClient.Subscribe(ctx, []telemetryapi.EventType{telemetryapi.Platform}, res.ExtensionID, addr) + if err != nil { + logger.Fatal("Cannot register Telemetry API client", zap.Error(err)) + } } lm := &manager{ logger: logger.Named("lifecycle.manager"), extensionClient: extensionClient, listener: listener, + initType: initType, } factories, _ := lambdacomponents.Components(res.ExtensionID) @@ -134,7 +148,9 @@ func (lm *manager) processEvents(ctx context.Context) error { if res.EventType == extensionapi.Shutdown { lm.logger.Info("Received SHUTDOWN event") lm.notifyEnvironmentShutdown() - lm.listener.Shutdown() + if lm.listener != nil { + lm.listener.Shutdown() + } err = lm.collector.Stop() if err != nil { if _, exitErr := lm.extensionClient.ExitError(ctx, fmt.Sprintf("error stopping collector: %v", err)); exitErr != nil { @@ -142,17 +158,17 @@ func (lm *manager) processEvents(ctx context.Context) error { } } return err - } + } else if lm.listener != nil && res.EventType == extensionapi.Invoke { + lm.notifyFunctionInvoked() - lm.notifyFunctionInvoked() + err = lm.listener.Wait(ctx, res.RequestID) + if err != nil { + lm.logger.Error("problem waiting for platform.runtimeDone event", zap.Error(err), zap.String("requestID", res.RequestID)) + } - err = lm.listener.Wait(ctx, res.RequestID) - if err != nil { - lm.logger.Error("problem waiting for platform.runtimeDone event", zap.Error(err), zap.String("requestID", res.RequestID)) + // Check other components are ready before allowing the freezing of the environment. + lm.notifyFunctionFinished() } - - // Check other components are ready before allowing the freezing of the environment. - lm.notifyFunctionFinished() } } } diff --git a/collector/internal/lifecycle/manager_test.go b/collector/internal/lifecycle/manager_test.go index e121779552..e010276919 100644 --- a/collector/internal/lifecycle/manager_test.go +++ b/collector/internal/lifecycle/manager_test.go @@ -56,11 +56,12 @@ func TestRun(t *testing.T) { u, err := url.Parse(server.URL) require.NoError(t, err) + extensionEventTypes := []extensionapi.EventType{extensionapi.Invoke, extensionapi.Shutdown} // test with an error lm := manager{ collector: &MockCollector{err: fmt.Errorf("test start error")}, logger: logger, - extensionClient: extensionapi.NewClient(logger, ""), + extensionClient: extensionapi.NewClient(logger, "", extensionEventTypes), } require.Error(t, lm.Run(ctx)) // test with no waitgroup @@ -68,7 +69,7 @@ func TestRun(t *testing.T) { collector: &MockCollector{}, logger: logger, listener: telemetryapi.NewListener(logger), - extensionClient: extensionapi.NewClient(logger, u.Host), + extensionClient: extensionapi.NewClient(logger, u.Host, extensionEventTypes), } require.NoError(t, lm.Run(ctx)) // test with waitgroup counter incremented @@ -76,7 +77,7 @@ func TestRun(t *testing.T) { collector: &MockCollector{}, logger: logger, listener: telemetryapi.NewListener(logger), - extensionClient: extensionapi.NewClient(logger, u.Host), + extensionClient: extensionapi.NewClient(logger, u.Host, extensionEventTypes), } lm.wg.Add(1) go func() { @@ -142,7 +143,7 @@ func TestProcessEvents(t *testing.T) { collector: &MockCollector{err: tc.collectorError}, logger: logger, listener: telemetryapi.NewListener(logger), - extensionClient: extensionapi.NewClient(logger, u.Host), + extensionClient: extensionapi.NewClient(logger, u.Host, []extensionapi.EventType{extensionapi.Invoke, extensionapi.Shutdown}), } lm.wg.Add(1) if tc.err != nil { @@ -152,7 +153,6 @@ func TestProcessEvents(t *testing.T) { } else { require.NoError(t, lm.processEvents(ctx)) } - }) } diff --git a/collector/lambdalifecycle/constants.go b/collector/lambdalifecycle/constants.go new file mode 100644 index 0000000000..0854e088ec --- /dev/null +++ b/collector/lambdalifecycle/constants.go @@ -0,0 +1,17 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package lambdalifecycle + +const InitTypeEnvVar = "AWS_LAMBDA_INITIALIZATION_TYPE" diff --git a/collector/lambdalifecycle/go.mod b/collector/lambdalifecycle/go.mod index 79aa02e218..27f0704919 100644 --- a/collector/lambdalifecycle/go.mod +++ b/collector/lambdalifecycle/go.mod @@ -1,3 +1,11 @@ module github.com/open-telemetry/opentelemetry-lambda/collector/lambdalifecycle go 1.24.4 + +require github.com/stretchr/testify v1.11.1 + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/collector/lambdalifecycle/go.sum b/collector/lambdalifecycle/go.sum new file mode 100644 index 0000000000..c4c1710c47 --- /dev/null +++ b/collector/lambdalifecycle/go.sum @@ -0,0 +1,10 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/collector/lambdalifecycle/types.go b/collector/lambdalifecycle/types.go new file mode 100644 index 0000000000..c2602429c4 --- /dev/null +++ b/collector/lambdalifecycle/types.go @@ -0,0 +1,61 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package lambdalifecycle + +import "os" + +type InitType int + +const ( + OnDemand InitType = iota + ProvisionedConcurrency + SnapStart + LambdaManagedInstances + Unknown InitType = -1 +) + +func (t InitType) String() string { + switch t { + case OnDemand: + return "on-demand" + case ProvisionedConcurrency: + return "provisioned-concurrency" + case SnapStart: + return "snap-start" + case LambdaManagedInstances: + return "lambda-managed-instances" + default: + return "unknown" + } +} + +func ParseInitType(s string) InitType { + switch s { + case "on-demand": + return OnDemand + case "provisioned-concurrency": + return ProvisionedConcurrency + case "snap-start": + return SnapStart + case "lambda-managed-instances": + return LambdaManagedInstances + default: + return Unknown + } +} + +func InitTypeFromEnv(envVar string) InitType { + return ParseInitType(os.Getenv(envVar)) +} diff --git a/collector/lambdalifecycle/types_test.go b/collector/lambdalifecycle/types_test.go new file mode 100644 index 0000000000..d935a6428e --- /dev/null +++ b/collector/lambdalifecycle/types_test.go @@ -0,0 +1,99 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package lambdalifecycle + +import ( + "os" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestInitType_String(t *testing.T) { + tests := []struct { + initType InitType + expected string + }{ + {OnDemand, "on-demand"}, + {ProvisionedConcurrency, "provisioned-concurrency"}, + {SnapStart, "snap-start"}, + {LambdaManagedInstances, "lambda-managed-instances"}, + {Unknown, "unknown"}, + {InitType(99), "unknown"}, + } + + for _, tt := range tests { + t.Run(tt.expected, func(t *testing.T) { + if result := tt.initType.String(); result != tt.expected { + t.Errorf("InitType.String() = %q, expected %q", result, tt.expected) + } + }) + } +} + +func TestParseInitType(t *testing.T) { + tests := []struct { + input string + expected InitType + }{ + {"on-demand", OnDemand}, + {"provisioned-concurrency", ProvisionedConcurrency}, + {"snap-start", SnapStart}, + {"lambda-managed-instances", LambdaManagedInstances}, + {"unknown", Unknown}, + {"", Unknown}, + {"invalid", Unknown}, + } + + for _, tt := range tests { + t.Run(tt.input, func(t *testing.T) { + if result := ParseInitType(tt.input); result != tt.expected { + t.Errorf("ParseInitType(%q) = %v, expected %v", tt.input, result, tt.expected) + } + }) + } +} + +func TestInitTypeFromEnv(t *testing.T) { + const testEnvVar = "TEST_INIT_TYPE" + + tests := []struct { + name string + envVal string + expected InitType + setEnv bool + }{ + {"on-demand", "on-demand", OnDemand, true}, + {"provisioned-concurrency", "provisioned-concurrency", ProvisionedConcurrency, true}, + {"snap-start", "snap-start", SnapStart, true}, + {"lambda-managed-instances", "lambda-managed-instances", LambdaManagedInstances, true}, + {"unset env var", "", Unknown, false}, + {"empty env var", "", Unknown, true}, + {"invalid value", "foo", Unknown, true}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + require.NoError(t, os.Unsetenv(testEnvVar)) + if tt.setEnv { + require.NoError(t, os.Setenv(testEnvVar, tt.envVal)) + } + + if result := InitTypeFromEnv(testEnvVar); result != tt.expected { + t.Errorf("InitTypeFromEnv() = %v, expected %v", result, tt.expected) + } + }) + } +} diff --git a/collector/receiver/telemetryapireceiver/go.mod b/collector/receiver/telemetryapireceiver/go.mod index daac77d6fa..90b2020da6 100644 --- a/collector/receiver/telemetryapireceiver/go.mod +++ b/collector/receiver/telemetryapireceiver/go.mod @@ -4,9 +4,12 @@ go 1.24.4 replace github.com/open-telemetry/opentelemetry-lambda/collector => ../../ +replace github.com/open-telemetry/opentelemetry-lambda/collector/lambdalifecycle => ../../lambdalifecycle + require ( github.com/golang-collections/go-datastructures v0.0.0-20150211160725-59788d5eb259 github.com/open-telemetry/opentelemetry-lambda/collector v0.98.0 + github.com/open-telemetry/opentelemetry-lambda/collector/lambdalifecycle v0.0.0-00010101000000-000000000000 github.com/stretchr/testify v1.11.1 go.opentelemetry.io/collector/component v1.48.0 go.opentelemetry.io/collector/component/componenttest v0.142.0 diff --git a/collector/receiver/telemetryapireceiver/receiver.go b/collector/receiver/telemetryapireceiver/receiver.go index 7b1f8b356a..2354909d9d 100644 --- a/collector/receiver/telemetryapireceiver/receiver.go +++ b/collector/receiver/telemetryapireceiver/receiver.go @@ -28,6 +28,8 @@ import ( "time" "github.com/golang-collections/go-datastructures/queue" + "github.com/open-telemetry/opentelemetry-lambda/collector/internal/telemetryapi" + "github.com/open-telemetry/opentelemetry-lambda/collector/lambdalifecycle" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/pcommon" @@ -36,14 +38,12 @@ import ( "go.opentelemetry.io/collector/receiver" semconv "go.opentelemetry.io/collector/semconv/v1.25.0" "go.uber.org/zap" - - "github.com/open-telemetry/opentelemetry-lambda/collector/internal/telemetryapi" ) const ( initialQueueSize = 5 scopeName = "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapi" - platformReportLogFmt = "REPORT RequestId: %s Duration: %.2f ms Billed Duration: %.0f ms Memory Size: %.0f MB Max Memory Used: %.0f MB" + platformReportLogFmt = "REPORT RequestId: %s" platformStartLogFmt = "START RequestId: %s Version: %s" platformRuntimeDoneLogFmt = "END RequestId: %s Version: %s" platformInitStartLogFmt = "INIT_START Runtime Version: %s Runtime Version ARN: %s" @@ -72,6 +72,7 @@ type telemetryAPIReceiver struct { resource pcommon.Resource faasFunctionVersion string currentFaasInvocationID string + lambdaInitType lambdalifecycle.InitType logReport bool } @@ -140,12 +141,16 @@ func (r *telemetryAPIReceiver) httpHandler(w http.ResponseWriter, req *http.Requ switch el.Type { // Function initialization started. case string(telemetryapi.PlatformInitStart): - r.logger.Info(fmt.Sprintf("Init start: %s", r.lastPlatformStartTime), zap.Any("event", el)) - r.lastPlatformStartTime = el.Time + if el.Time != "" { + r.lastPlatformStartTime = el.Time + r.logger.Info(fmt.Sprintf("Init start: %s", r.lastPlatformStartTime), zap.Any("event", el)) + } // Function initialization completed. - case string(telemetryapi.PlatformInitRuntimeDone): - r.logger.Info(fmt.Sprintf("Init end: %s", r.lastPlatformEndTime), zap.Any("event", el)) - r.lastPlatformEndTime = el.Time + case string(telemetryapi.PlatformInitRuntimeDone), string(telemetryapi.PlatformInitReport): + if r.lastPlatformStartTime != "" && el.Time != "" { + r.lastPlatformEndTime = el.Time + r.logger.Info(fmt.Sprintf("Init end: %s", r.lastPlatformEndTime), zap.Any("event", el)) + } } // TODO: add support for additional events, see https://docs.aws.amazon.com/lambda/latest/dg/telemetry-api.html // A report of function initialization. @@ -167,6 +172,7 @@ func (r *telemetryAPIReceiver) httpHandler(w http.ResponseWriter, req *http.Requ // Lambda dropped log entries. // case "platform.logsDropped": } + if len(r.lastPlatformStartTime) > 0 && len(r.lastPlatformEndTime) > 0 { if td, err := r.createPlatformInitSpan(r.lastPlatformStartTime, r.lastPlatformEndTime); err == nil { if r.nextTraces != nil { @@ -198,14 +204,28 @@ func (r *telemetryAPIReceiver) httpHandler(w http.ResponseWriter, req *http.Requ } func (r *telemetryAPIReceiver) getRecordRequestId(record map[string]interface{}) string { - if requestId, ok := record["requestId"].(string); ok { - return requestId - } else if r.currentFaasInvocationID != "" { + if record != nil { + if requestId, ok := record["requestId"].(string); ok { + return requestId + } + } + + return "" +} + +func (r *telemetryAPIReceiver) getCurrentRequestId() string { + if r.lambdaInitType != lambdalifecycle.LambdaManagedInstances { return r.currentFaasInvocationID } return "" } +func (r *telemetryAPIReceiver) updateCurrentRequestId(requestId string) { + if r.lambdaInitType != lambdalifecycle.LambdaManagedInstances { + r.currentFaasInvocationID = requestId + } +} + func (r *telemetryAPIReceiver) createLogs(slice []event) (plog.Logs, error) { log := plog.NewLogs() resourceLog := log.ResourceLogs().AppendEmpty() @@ -228,14 +248,19 @@ func (r *telemetryAPIReceiver) createLogs(slice []event) (plog.Logs, error) { } if record, ok := el.Record.(map[string]interface{}); ok { requestId := r.getRecordRequestId(record) + + // If this is the first event in the invocation with a request id (i.e. the "platform.start" event), + // set the current invocation id to this request id. + if requestId != "" && el.Type == string(telemetryapi.PlatformStart) { + r.updateCurrentRequestId(requestId) + } + + if requestId == "" { + requestId = r.getCurrentRequestId() + } + if requestId != "" { logRecord.Attributes().PutStr(semconv.AttributeFaaSInvocationID, requestId) - - // If this is the first event in the invocation with a request id (i.e. the "platform.start" event), - // set the current invocation id to this request id. - if el.Type == string(telemetryapi.PlatformStart) { - r.currentFaasInvocationID = requestId - } } // in JSON format https://docs.aws.amazon.com/lambda/latest/dg/telemetry-schema-reference.html#telemetry-api-function @@ -258,6 +283,10 @@ func (r *telemetryAPIReceiver) createLogs(slice []event) (plog.Logs, error) { if functionVersion != "" { r.faasFunctionVersion = functionVersion } + } else if el.Type == string(telemetryapi.PlatformStart) { + if version, _ := record["version"].(string); version != "" { + r.faasFunctionVersion = version + } } message := createPlatformMessage(requestId, r.faasFunctionVersion, el.Type, record) @@ -268,8 +297,13 @@ func (r *telemetryAPIReceiver) createLogs(slice []event) (plog.Logs, error) { logRecord.Body().SetStr(line) } } else { - if r.currentFaasInvocationID != "" { - logRecord.Attributes().PutStr(semconv.AttributeFaaSInvocationID, r.currentFaasInvocationID) + requestId := r.getRecordRequestId(nil) + if requestId == "" { + requestId = r.getCurrentRequestId() + } + + if requestId != "" { + logRecord.Attributes().PutStr(semconv.AttributeFaaSInvocationID, requestId) } // in plain text https://docs.aws.amazon.com/lambda/latest/dg/telemetry-schema-reference.html#telemetry-api-function if line, ok := el.Record.(string); ok { @@ -277,7 +311,7 @@ func (r *telemetryAPIReceiver) createLogs(slice []event) (plog.Logs, error) { } } if el.Type == string(telemetryapi.PlatformRuntimeDone) { - r.currentFaasInvocationID = "" + r.updateCurrentRequestId("") } } return log, nil @@ -370,42 +404,33 @@ func createPlatformMessage(requestId string, functionVersion string, eventType s } func createPlatformReportMessage(requestId string, record map[string]interface{}) string { - // gathering metrics - metrics, ok := record["metrics"].(map[string]interface{}) - if !ok { + if requestId == "" { return "" } - var durationMs, billedDurationMs, memorySizeMB, maxMemoryUsedMB float64 - if durationMs, ok = metrics[string(telemetryapi.MetricDurationMs)].(float64); !ok { - return "" + + message := fmt.Sprintf(platformReportLogFmt, requestId) + metrics, ok := record["metrics"].(map[string]interface{}) + if !ok { + return message } - if billedDurationMs, ok = metrics[string(telemetryapi.MetricBilledDurationMs)].(float64); !ok { - return "" + + if durationMs, ok := metrics["durationMs"].(float64); ok { + message += fmt.Sprintf(" Duration: %.2f ms", durationMs) } - if memorySizeMB, ok = metrics[string(telemetryapi.MetricMemorySizeMB)].(float64); !ok { - return "" + + if billedDurationMs, ok := metrics["billedDurationMs"].(float64); ok { + message += fmt.Sprintf(" Billed Duration: %.0f ms", billedDurationMs) } - if maxMemoryUsedMB, ok = metrics[string(telemetryapi.MetricMaxMemoryUsedMB)].(float64); !ok { - return "" + + if memorySizeMB, ok := metrics["memorySizeMB"].(float64); ok { + message += fmt.Sprintf(" Memory Size: %.0f MB", memorySizeMB) } - // optionally gather information about cold start time - var initDurationMs float64 - if initDurationMsVal, exists := metrics[string(telemetryapi.MetricInitDurationMs)]; exists { - if val, ok := initDurationMsVal.(float64); ok { - initDurationMs = val - } + if maxMemoryUsedMB, ok := metrics["maxMemoryUsedMB"].(float64); ok { + message += fmt.Sprintf(" Max Memory Used: %.0f MB", maxMemoryUsedMB) } - message := fmt.Sprintf( - platformReportLogFmt, - requestId, - durationMs, - billedDurationMs, - memorySizeMB, - maxMemoryUsedMB, - ) - if initDurationMs > 0 { + if initDurationMs, ok := metrics["initDurationMs"].(float64); ok { message += fmt.Sprintf(" Init Duration: %.2f ms", initDurationMs) } @@ -523,14 +548,17 @@ func newTelemetryAPIReceiver( } } + lambdaInitType := lambdalifecycle.InitTypeFromEnv(lambdalifecycle.InitTypeEnvVar) + return &telemetryAPIReceiver{ - logger: set.Logger, - queue: queue.New(initialQueueSize), - extensionID: cfg.extensionID, - port: cfg.Port, - types: subscribedTypes, - resource: r, - logReport: cfg.LogReport, + logger: set.Logger, + queue: queue.New(initialQueueSize), + extensionID: cfg.extensionID, + port: cfg.Port, + types: subscribedTypes, + resource: r, + lambdaInitType: lambdaInitType, + logReport: cfg.LogReport, }, nil } diff --git a/collector/receiver/telemetryapireceiver/receiver_test.go b/collector/receiver/telemetryapireceiver/receiver_test.go index b3beafd2bd..0ed684764d 100644 --- a/collector/receiver/telemetryapireceiver/receiver_test.go +++ b/collector/receiver/telemetryapireceiver/receiver_test.go @@ -969,7 +969,7 @@ func TestCreatePlatformMessage(t *testing.T) { functionVersion: "$LATEST", eventType: "platform.report", record: map[string]interface{}{}, - expected: "", + expected: "REPORT RequestId: test-request-id", }, { desc: "platform.initStart with runtimeVersion and runtimeVersionArn",