Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"fmt"
"strings"

"github.com/open-telemetry/opentelemetry-lambda/collector/lambdalifecycle"
"go.opentelemetry.io/collector/confmap"
)

Expand All @@ -43,6 +44,14 @@ func New() confmap.Converter {
}

func (c converter) Convert(_ context.Context, conf *confmap.Conf) error {
initType := lambdalifecycle.InitTypeFromEnv(lambdalifecycle.InitTypeEnvVar)

// Do not append decouple processors for Lambda Managed Instances
// see: https://docs.aws.amazon.com/lambda/latest/dg/lambda-managed-instances-execution-environment.html
if initType == lambdalifecycle.LambdaManagedInstances {
return nil
}

serviceVal := conf.Get(serviceKey)
service, ok := serviceVal.(map[string]interface{})
if !ok {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"testing"

"github.com/open-telemetry/opentelemetry-lambda/collector/lambdalifecycle"
"go.opentelemetry.io/collector/confmap"

"github.com/google/go-cmp/cmp"
Expand Down Expand Up @@ -151,3 +152,38 @@ func TestConvert(t *testing.T) {
})
}
}

func TestConvert_LambdaManagedInstances(t *testing.T) {
t.Setenv(lambdalifecycle.InitTypeEnvVar, lambdalifecycle.LambdaManagedInstances.String())

// Config that would normally have decouple appended
input := confmap.NewFromStringMap(map[string]interface{}{
"service": map[string]interface{}{
"pipelines": map[string]interface{}{
"traces": map[string]interface{}{
"processors": []interface{}{"batch"},
},
},
},
})

// Expected to remain unchanged
expected := confmap.NewFromStringMap(map[string]interface{}{
"service": map[string]interface{}{
"pipelines": map[string]interface{}{
"traces": map[string]interface{}{
"processors": []interface{}{"batch"},
},
},
},
})

c := New()
err := c.Convert(context.Background(), input)
if err != nil {
t.Errorf("unexpected error converting: %v", err)
}
if diff := cmp.Diff(expected.ToStringMap(), input.ToStringMap()); diff != "" {
t.Errorf("Convert() mismatch: (-want +got):\n%s", diff)
}
}
6 changes: 4 additions & 2 deletions collector/internal/extensionapi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,17 @@ type Client struct {
httpClient *http.Client
extensionID string
logger *zap.Logger
events []EventType
}

// NewClient returns a Lambda Extensions API client.
func NewClient(logger *zap.Logger, awsLambdaRuntimeAPI string) *Client {
func NewClient(logger *zap.Logger, awsLambdaRuntimeAPI string, events []EventType) *Client {
baseURL := fmt.Sprintf("http://%s/2020-01-01/extension", awsLambdaRuntimeAPI)
return &Client{
baseURL: baseURL,
httpClient: &http.Client{},
logger: logger.Named("extensionAPI.Client"),
events: events,
}
}

Expand All @@ -94,7 +96,7 @@ func (e *Client) Register(ctx context.Context, filename string) (*RegisterRespon
url := e.baseURL + action

reqBody, err := json.Marshal(map[string]interface{}{
"events": []EventType{Invoke, Shutdown},
"events": e.events,
})
if err != nil {
return nil, err
Expand Down
17 changes: 17 additions & 0 deletions collector/internal/lifecycle/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package lifecycle

const RuntimeApiEnvVar = "AWS_LAMBDA_RUNTIME_API"
56 changes: 36 additions & 20 deletions collector/internal/lifecycle/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@ package lifecycle
import (
"context"
"fmt"
"github.com/open-telemetry/opentelemetry-lambda/collector/lambdalifecycle"
"os"
"os/signal"
"path/filepath"
"sync"
"syscall"

"github.com/open-telemetry/opentelemetry-lambda/collector/lambdalifecycle"

"go.uber.org/multierr"
"go.uber.org/zap"

Expand All @@ -49,6 +50,7 @@ type manager struct {
listener *telemetryapi.Listener
wg sync.WaitGroup
lifecycleListeners []lambdalifecycle.Listener
initType lambdalifecycle.InitType
}
Comment on lines 50 to 54
Copy link

Copilot AI Feb 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new manager.initType field is assigned in NewManager but never read anywhere, so it adds state without affecting behavior. Either remove the field or use it in processEvents()/other methods so it serves a purpose (e.g., for logging, guarding listener usage, or future branching).

Copilot uses AI. Check for mistakes.

func NewManager(ctx context.Context, logger *zap.Logger, version string) (context.Context, *manager) {
Expand All @@ -62,28 +64,40 @@ func NewManager(ctx context.Context, logger *zap.Logger, version string) (contex
logger.Info("received signal", zap.String("signal", s.String()))
}()

extensionClient := extensionapi.NewClient(logger, os.Getenv("AWS_LAMBDA_RUNTIME_API"))
var extensionEvents []extensionapi.EventType
initType := lambdalifecycle.InitTypeFromEnv(lambdalifecycle.InitTypeEnvVar)
if initType == lambdalifecycle.LambdaManagedInstances {
extensionEvents = []extensionapi.EventType{extensionapi.Shutdown}
} else {
extensionEvents = []extensionapi.EventType{extensionapi.Invoke, extensionapi.Shutdown}
}
Comment on lines +67 to +73
Copy link

Copilot AI Feb 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Lambda managed-instances branching in NewManager (event subscription selection + skipping Telemetry API listener startup) is new behavior but isn't covered by unit tests in this package. Add a test that sets AWS_LAMBDA_INITIALIZATION_TYPE to lambda-managed-instances and verifies the extension client is registered with only SHUTDOWN and that listener/Wait + FunctionInvoked/Finished paths are not used.

Copilot uses AI. Check for mistakes.

extensionClient := extensionapi.NewClient(logger, os.Getenv(RuntimeApiEnvVar), extensionEvents)
res, err := extensionClient.Register(ctx, extensionName)
if err != nil {
logger.Fatal("Cannot register extension", zap.Error(err))
}

listener := telemetryapi.NewListener(logger)
addr, err := listener.Start()
if err != nil {
logger.Fatal("Cannot start Telemetry API Listener", zap.Error(err))
}
var listener *telemetryapi.Listener
if initType != lambdalifecycle.LambdaManagedInstances {
listener = telemetryapi.NewListener(logger)
addr, err := listener.Start()
if err != nil {
logger.Fatal("Cannot start Telemetry API Listener", zap.Error(err))
}

telemetryClient := telemetryapi.NewClient(logger)
_, err = telemetryClient.Subscribe(ctx, []telemetryapi.EventType{telemetryapi.Platform}, res.ExtensionID, addr)
if err != nil {
logger.Fatal("Cannot register Telemetry API client", zap.Error(err))
telemetryClient := telemetryapi.NewClient(logger)
_, err = telemetryClient.Subscribe(ctx, []telemetryapi.EventType{telemetryapi.Platform}, res.ExtensionID, addr)
if err != nil {
logger.Fatal("Cannot register Telemetry API client", zap.Error(err))
}
}

lm := &manager{
logger: logger.Named("lifecycle.manager"),
extensionClient: extensionClient,
listener: listener,
initType: initType,
}

factories, _ := lambdacomponents.Components(res.ExtensionID)
Expand Down Expand Up @@ -134,25 +148,27 @@ func (lm *manager) processEvents(ctx context.Context) error {
if res.EventType == extensionapi.Shutdown {
lm.logger.Info("Received SHUTDOWN event")
lm.notifyEnvironmentShutdown()
lm.listener.Shutdown()
if lm.listener != nil {
lm.listener.Shutdown()
}
err = lm.collector.Stop()
if err != nil {
if _, exitErr := lm.extensionClient.ExitError(ctx, fmt.Sprintf("error stopping collector: %v", err)); exitErr != nil {
return multierr.Combine(err, exitErr)
}
}
return err
}
} else if lm.listener != nil && res.EventType == extensionapi.Invoke {
lm.notifyFunctionInvoked()

lm.notifyFunctionInvoked()
err = lm.listener.Wait(ctx, res.RequestID)
if err != nil {
lm.logger.Error("problem waiting for platform.runtimeDone event", zap.Error(err), zap.String("requestID", res.RequestID))
}

err = lm.listener.Wait(ctx, res.RequestID)
if err != nil {
lm.logger.Error("problem waiting for platform.runtimeDone event", zap.Error(err), zap.String("requestID", res.RequestID))
// Check other components are ready before allowing the freezing of the environment.
lm.notifyFunctionFinished()
}

// Check other components are ready before allowing the freezing of the environment.
lm.notifyFunctionFinished()
}
}
}
Expand Down
10 changes: 5 additions & 5 deletions collector/internal/lifecycle/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,27 +56,28 @@ func TestRun(t *testing.T) {
u, err := url.Parse(server.URL)
require.NoError(t, err)

extensionEventTypes := []extensionapi.EventType{extensionapi.Invoke, extensionapi.Shutdown}
// test with an error
lm := manager{
collector: &MockCollector{err: fmt.Errorf("test start error")},
logger: logger,
extensionClient: extensionapi.NewClient(logger, ""),
extensionClient: extensionapi.NewClient(logger, "", extensionEventTypes),
}
require.Error(t, lm.Run(ctx))
// test with no waitgroup
lm = manager{
collector: &MockCollector{},
logger: logger,
listener: telemetryapi.NewListener(logger),
extensionClient: extensionapi.NewClient(logger, u.Host),
extensionClient: extensionapi.NewClient(logger, u.Host, extensionEventTypes),
}
require.NoError(t, lm.Run(ctx))
// test with waitgroup counter incremented
lm = manager{
collector: &MockCollector{},
logger: logger,
listener: telemetryapi.NewListener(logger),
extensionClient: extensionapi.NewClient(logger, u.Host),
extensionClient: extensionapi.NewClient(logger, u.Host, extensionEventTypes),
}
lm.wg.Add(1)
go func() {
Expand Down Expand Up @@ -142,7 +143,7 @@ func TestProcessEvents(t *testing.T) {
collector: &MockCollector{err: tc.collectorError},
logger: logger,
listener: telemetryapi.NewListener(logger),
extensionClient: extensionapi.NewClient(logger, u.Host),
extensionClient: extensionapi.NewClient(logger, u.Host, []extensionapi.EventType{extensionapi.Invoke, extensionapi.Shutdown}),
}
lm.wg.Add(1)
if tc.err != nil {
Expand All @@ -152,7 +153,6 @@ func TestProcessEvents(t *testing.T) {
} else {
require.NoError(t, lm.processEvents(ctx))
}

})
}

Expand Down
17 changes: 17 additions & 0 deletions collector/lambdalifecycle/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package lambdalifecycle

const InitTypeEnvVar = "AWS_LAMBDA_INITIALIZATION_TYPE"
8 changes: 8 additions & 0 deletions collector/lambdalifecycle/go.mod
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
module github.com/open-telemetry/opentelemetry-lambda/collector/lambdalifecycle

go 1.24.11

require github.com/stretchr/testify v1.11.1

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
10 changes: 10 additions & 0 deletions collector/lambdalifecycle/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
61 changes: 61 additions & 0 deletions collector/lambdalifecycle/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package lambdalifecycle

import "os"

type InitType int

const (
OnDemand InitType = iota
ProvisionedConcurrency
SnapStart
LambdaManagedInstances
Unknown InitType = -1
)

func (t InitType) String() string {
switch t {
case OnDemand:
return "on-demand"
case ProvisionedConcurrency:
return "provisioned-concurrency"
case SnapStart:
return "snap-start"
case LambdaManagedInstances:
return "lambda-managed-instances"
default:
return "unknown"
}
}

func ParseInitType(s string) InitType {
switch s {
case "on-demand":
return OnDemand
case "provisioned-concurrency":
return ProvisionedConcurrency
case "snap-start":
return SnapStart
case "lambda-managed-instances":
return LambdaManagedInstances
default:
return Unknown
}
}

func InitTypeFromEnv(envVar string) InitType {
return ParseInitType(os.Getenv(envVar))
}
Loading
Loading