From bde0377686e3cc5ef28a55b92800eb9d06bac0a4 Mon Sep 17 00:00:00 2001 From: Lukas Hering Date: Fri, 26 Dec 2025 14:44:41 -0500 Subject: [PATCH] refactor: improve telemetry api receiver listener initialization logic --- .../receiver/telemetryapireceiver/factory.go | 2 +- .../receiver/telemetryapireceiver/receiver.go | 59 ++++++++++++------- .../telemetryapireceiver/receiver_test.go | 8 +-- 3 files changed, 43 insertions(+), 26 deletions(-) diff --git a/collector/receiver/telemetryapireceiver/factory.go b/collector/receiver/telemetryapireceiver/factory.go index b5ea6a3b13..01cfaf4d36 100644 --- a/collector/receiver/telemetryapireceiver/factory.go +++ b/collector/receiver/telemetryapireceiver/factory.go @@ -27,7 +27,7 @@ import ( const ( typeStr = "telemetryapi" stability = component.StabilityLevelDevelopment - defaultPort = 4325 + defaultPort = 0 platform = "platform" function = "function" extension = "extension" diff --git a/collector/receiver/telemetryapireceiver/receiver.go b/collector/receiver/telemetryapireceiver/receiver.go index 7b1f8b356a..43d97b0d96 100644 --- a/collector/receiver/telemetryapireceiver/receiver.go +++ b/collector/receiver/telemetryapireceiver/receiver.go @@ -18,11 +18,12 @@ import ( "context" crand "crypto/rand" "encoding/json" + "errors" "fmt" "io" + "net" "net/http" "os" - "strconv" "strings" "sync" "time" @@ -75,30 +76,52 @@ type telemetryAPIReceiver struct { logReport bool } -func (r *telemetryAPIReceiver) Start(ctx context.Context, host component.Host) error { - address := listenOnAddress(r.port) - r.logger.Info("Listening for requests", zap.String("address", address)) +func (r *telemetryAPIReceiver) Start(ctx context.Context, _ component.Host) error { + if len(r.types) == 0 { + return fmt.Errorf("no telemetry event types provided") + } + listener, address, err := r.bindListener(ctx) + if err != nil { + return fmt.Errorf("failed to bind listener: %w", err) + } + r.logger.Info("Starting telemetry API listener", zap.String("address", address)) mux := http.NewServeMux() mux.HandleFunc("/", r.httpHandler) r.httpServer = &http.Server{Addr: address, Handler: mux} go func() { - _ = r.httpServer.ListenAndServe() + err := r.httpServer.Serve(listener) + if !errors.Is(err, http.ErrServerClosed) { + r.logger.Error("Telemetry API server stopped unexpectedly", zap.Error(err)) + } else { + r.logger.Info("Telemetry API server stopped", zap.String("address", address)) + } }() telemetryClient := telemetryapi.NewClient(r.logger) - if len(r.types) > 0 { - _, err := telemetryClient.Subscribe(ctx, r.types, r.extensionID, fmt.Sprintf("http://%s/", address)) - if err != nil { - r.logger.Info("Listening for requests", zap.String("address", address), zap.String("extensionID", r.extensionID)) - return err - } + if _, err := telemetryClient.Subscribe(ctx, r.types, r.extensionID, fmt.Sprintf("http://%s/", address)); err != nil { + r.logger.Error("Failed to subscribe to telemetry", zap.Error(err)) + _ = r.Shutdown(ctx) + return err } + r.logger.Info("Successfully subscribed to telemetry", zap.String("address", address)) return nil } +func (r *telemetryAPIReceiver) bindListener(ctx context.Context) (net.Listener, string, error) { + listenerAddr := listenOnAddress() + var lc net.ListenConfig + l, err := lc.Listen(ctx, "tcp", fmt.Sprintf("%s:%d", listenerAddr, r.port)) + if err != nil { + return nil, "", err + } + addr := fmt.Sprintf("%s:%d", l.Addr().Network(), l.Addr().(*net.TCPAddr).Port) + return l, addr, nil +} + func (r *telemetryAPIReceiver) Shutdown(ctx context.Context) error { - return nil + err := r.httpServer.Shutdown(ctx) + return err } func newSpanID() pcommon.SpanID { @@ -192,9 +215,6 @@ func (r *telemetryAPIReceiver) httpHandler(w http.ResponseWriter, req *http.Requ } } } - - r.logger.Debug("logEvents received", zap.Int("count", len(slice)), zap.Int64("queue_length", r.queue.Len())) - slice = nil } func (r *telemetryAPIReceiver) getRecordRequestId(record map[string]interface{}) string { @@ -534,14 +554,11 @@ func newTelemetryAPIReceiver( }, nil } -func listenOnAddress(port int) string { +func listenOnAddress() string { envAwsLocal, ok := os.LookupEnv("AWS_SAM_LOCAL") - var addr string if ok && envAwsLocal == "true" { - addr = ":" + strconv.Itoa(port) + return "" } else { - addr = "sandbox.localdomain:" + strconv.Itoa(port) + return "sandbox.localdomain" } - - return addr } diff --git a/collector/receiver/telemetryapireceiver/receiver_test.go b/collector/receiver/telemetryapireceiver/receiver_test.go index b3beafd2bd..1762579093 100644 --- a/collector/receiver/telemetryapireceiver/receiver_test.go +++ b/collector/receiver/telemetryapireceiver/receiver_test.go @@ -38,16 +38,16 @@ func TestListenOnAddress(t *testing.T) { { desc: "listen on address without AWS_SAM_LOCAL env variable", testFunc: func(t *testing.T) { - addr := listenOnAddress(4325) - require.EqualValues(t, "sandbox.localdomain:4325", addr) + addr := listenOnAddress() + require.EqualValues(t, "sandbox.localdomain", addr) }, }, { desc: "listen on address with AWS_SAM_LOCAL env variable", testFunc: func(t *testing.T) { t.Setenv("AWS_SAM_LOCAL", "true") - addr := listenOnAddress(4325) - require.EqualValues(t, ":4325", addr) + addr := listenOnAddress() + require.EqualValues(t, "", addr) }, }, }