From a08abb22860cc13e7813c95925fab174fd7c9c39 Mon Sep 17 00:00:00 2001 From: Lukas Hering Date: Sun, 7 Dec 2025 19:12:51 -0500 Subject: [PATCH 01/12] initial changes to export telemetry api logs --- .../receiver/telemetryapireceiver/receiver.go | 146 ++++++++---------- 1 file changed, 67 insertions(+), 79 deletions(-) diff --git a/collector/receiver/telemetryapireceiver/receiver.go b/collector/receiver/telemetryapireceiver/receiver.go index df5d08a3a1..7be02eb5cb 100644 --- a/collector/receiver/telemetryapireceiver/receiver.go +++ b/collector/receiver/telemetryapireceiver/receiver.go @@ -188,6 +188,15 @@ func (r *telemetryAPIReceiver) httpHandler(w http.ResponseWriter, req *http.Requ slice = nil } +func (r *telemetryAPIReceiver) getRecordRequestId(record map[string]interface{}) string { + if requestId, ok := record["requestId"].(string); ok { + return requestId + } else if r.currentFaasInvocationID != "" { + return r.currentFaasInvocationID + } + return "" +} + func (r *telemetryAPIReceiver) createLogs(slice []event) (plog.Logs, error) { log := plog.NewLogs() resourceLog := log.ResourceLogs().AppendEmpty() @@ -196,92 +205,83 @@ func (r *telemetryAPIReceiver) createLogs(slice []event) (plog.Logs, error) { scopeLog.Scope().SetName(scopeName) for _, el := range slice { r.logger.Debug(fmt.Sprintf("Event: %s", el.Type), zap.Any("event", el)) - if el.Type == string(telemetryapi.Function) || el.Type == string(telemetryapi.Extension) { - logRecord := scopeLog.LogRecords().AppendEmpty() - logRecord.Attributes().PutStr("type", el.Type) - if t, err := time.Parse(time.RFC3339, el.Time); err == nil { - logRecord.SetTimestamp(pcommon.NewTimestampFromTime(t)) - logRecord.SetObservedTimestamp(pcommon.NewTimestampFromTime(time.Now())) - } else { - r.logger.Error("error parsing time", zap.Error(err)) - return plog.Logs{}, err - } - if record, ok := el.Record.(map[string]interface{}); ok { - // in JSON format https://docs.aws.amazon.com/lambda/latest/dg/telemetry-schema-reference.html#telemetry-api-function - if timestamp, ok := record["timestamp"].(string); ok { - if t, err := time.Parse(time.RFC3339, timestamp); err == nil { - logRecord.SetTimestamp(pcommon.NewTimestampFromTime(t)) - } else { - r.logger.Error("error parsing time", zap.Error(err)) - return plog.Logs{}, err - } - } - if level, ok := record["level"].(string); ok { - logRecord.SetSeverityNumber(severityTextToNumber(strings.ToUpper(level))) - logRecord.SetSeverityText(logRecord.SeverityNumber().String()) - } - if requestId, ok := record["requestId"].(string); ok { - logRecord.Attributes().PutStr(semconv.AttributeFaaSInvocationID, requestId) - } else if r.currentFaasInvocationID != "" { - logRecord.Attributes().PutStr(semconv.AttributeFaaSInvocationID, r.currentFaasInvocationID) - } - if line, ok := record["message"].(string); ok { - logRecord.Body().SetStr(line) - } - } else { - if r.currentFaasInvocationID != "" { - logRecord.Attributes().PutStr(semconv.AttributeFaaSInvocationID, r.currentFaasInvocationID) - } - // in plain text https://docs.aws.amazon.com/lambda/latest/dg/telemetry-schema-reference.html#telemetry-api-function - if line, ok := el.Record.(string); ok { - logRecord.Body().SetStr(line) + logRecord := scopeLog.LogRecords().AppendEmpty() + logRecord.Attributes().PutStr("type", el.Type) + if t, err := time.Parse(time.RFC3339, el.Time); err == nil { + logRecord.SetTimestamp(pcommon.NewTimestampFromTime(t)) + logRecord.SetObservedTimestamp(pcommon.NewTimestampFromTime(time.Now())) + } else { + r.logger.Error("error parsing time", zap.Error(err)) + return plog.Logs{}, err + } + if record, ok := el.Record.(map[string]interface{}); ok { + // in JSON format https://docs.aws.amazon.com/lambda/latest/dg/telemetry-schema-reference.html#telemetry-api-function + if timestamp, ok := record["timestamp"].(string); ok { + if t, err := time.Parse(time.RFC3339, timestamp); err == nil { + logRecord.SetTimestamp(pcommon.NewTimestampFromTime(t)) + } else { + r.logger.Error("error parsing time", zap.Error(err)) + return plog.Logs{}, err } } - } else { // platform events, if subscribed to - if el.Type == string(telemetryapi.PlatformStart) { - if record, ok := el.Record.(map[string]interface{}); ok { - if requestId, ok := record["requestId"].(string); ok { - r.currentFaasInvocationID = requestId - } + if level, ok := record["level"].(string); ok { + logRecord.SetSeverityNumber(severityTextToNumber(strings.ToUpper(level))) + logRecord.SetSeverityText(logRecord.SeverityNumber().String()) + } + + requestId := r.getRecordRequestId(record) + if requestId != "" { + logRecord.Attributes().PutStr(semconv.AttributeFaaSInvocationID, requestId) + + // If this is the first event in the invocation with a request id (typically "platform.start"), + // set the current invocation id to this request id. + if r.currentFaasInvocationID == "" { + r.currentFaasInvocationID = requestId } - } else if el.Type == string(telemetryapi.PlatformRuntimeDone) { - r.currentFaasInvocationID = "" - } else if el.Type == string(telemetryapi.PlatformReport) && r.logReport { - if record, ok := el.Record.(map[string]interface{}); ok { - if logRecord := createReportLogRecord(&scopeLog, record); logRecord != nil { - logRecord.Attributes().PutStr("type", el.Type) - if t, err := time.Parse(time.RFC3339, el.Time); err == nil { - logRecord.SetTimestamp(pcommon.NewTimestampFromTime(t)) - logRecord.SetObservedTimestamp(pcommon.NewTimestampFromTime(time.Now())) - } - } + } + + if line, ok := record["message"].(string); ok { + logRecord.Body().SetStr(line) + } else if el.Type == string(telemetryapi.PlatformReport) { + platformReportMessage := createPlatformReportMessage(requestId, record) + if platformReportMessage != "" { + logRecord.Body().SetStr(platformReportMessage) } } + } else { + if r.currentFaasInvocationID != "" { + logRecord.Attributes().PutStr(semconv.AttributeFaaSInvocationID, r.currentFaasInvocationID) + } + // in plain text https://docs.aws.amazon.com/lambda/latest/dg/telemetry-schema-reference.html#telemetry-api-function + if line, ok := el.Record.(string); ok { + logRecord.Body().SetStr(line) + } + } + if el.Type == string(telemetryapi.PlatformRuntimeDone) { + r.currentFaasInvocationID = "" } } return log, nil } -// createReportLogRecord creates a log record for the platform.report event -// returns the log record if successful, otherwise nil -func createReportLogRecord(scopeLog *plog.ScopeLogs, record map[string]interface{}) *plog.LogRecord { +func createPlatformReportMessage(requestId string, record map[string]interface{}) string { // gathering metrics metrics, ok := record["metrics"].(map[string]interface{}) if !ok { - return nil + return "" } var durationMs, billedDurationMs, memorySizeMB, maxMemoryUsedMB float64 if durationMs, ok = metrics[string(telemetryapi.MetricDurationMs)].(float64); !ok { - return nil + return "" } if billedDurationMs, ok = metrics[string(telemetryapi.MetricBilledDurationMs)].(float64); !ok { - return nil + return "" } if memorySizeMB, ok = metrics[string(telemetryapi.MetricMemorySizeMB)].(float64); !ok { - return nil + return "" } if maxMemoryUsedMB, ok = metrics[string(telemetryapi.MetricMaxMemoryUsedMB)].(float64); !ok { - return nil + return "" } // optionally gather information about cold start time @@ -292,18 +292,7 @@ func createReportLogRecord(scopeLog *plog.ScopeLogs, record map[string]interface } } - // gathering requestId - requestId := "" - if requestId, ok = record["requestId"].(string); !ok { - return nil - } - - // we have all information available, we can create the log record - logRecord := scopeLog.LogRecords().AppendEmpty() - logRecord.Attributes().PutStr(semconv.AttributeFaaSInvocationID, requestId) - - // building the body of the log record, optionally adding the init duration - body := fmt.Sprintf( + message := fmt.Sprintf( logReportFmt, requestId, durationMs, @@ -312,11 +301,10 @@ func createReportLogRecord(scopeLog *plog.ScopeLogs, record map[string]interface maxMemoryUsedMB, ) if initDurationMs > 0 { - body += fmt.Sprintf(" Init Duration: %.2f ms", initDurationMs) + message += fmt.Sprintf(" Init Duration: %.2f ms", initDurationMs) } - logRecord.Body().SetStr(body) - return &logRecord + return message } func severityTextToNumber(severityText string) plog.SeverityNumber { From af9493ebdb94f0706127e8f1815b35472921df04 Mon Sep 17 00:00:00 2001 From: Lukas Hering Date: Sun, 7 Dec 2025 19:17:29 -0500 Subject: [PATCH 02/12] minor change to updating initial request id --- collector/receiver/telemetryapireceiver/receiver.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/collector/receiver/telemetryapireceiver/receiver.go b/collector/receiver/telemetryapireceiver/receiver.go index 7be02eb5cb..cfcb50e305 100644 --- a/collector/receiver/telemetryapireceiver/receiver.go +++ b/collector/receiver/telemetryapireceiver/receiver.go @@ -233,9 +233,9 @@ func (r *telemetryAPIReceiver) createLogs(slice []event) (plog.Logs, error) { if requestId != "" { logRecord.Attributes().PutStr(semconv.AttributeFaaSInvocationID, requestId) - // If this is the first event in the invocation with a request id (typically "platform.start"), + // If this is the first event in the invocation with a request id (i.e. the "platform.start" event), // set the current invocation id to this request id. - if r.currentFaasInvocationID == "" { + if el.Type == string(telemetryapi.PlatformStart) { r.currentFaasInvocationID = requestId } } From eb85f59f831070860e52a8033f680bc0e157d15d Mon Sep 17 00:00:00 2001 From: Lukas Hering Date: Mon, 8 Dec 2025 15:28:39 -0500 Subject: [PATCH 03/12] update unit tests --- .../internal/telemetryapi/listener_test.go | 8 +- .../receiver/telemetryapireceiver/config.go | 2 +- .../receiver/telemetryapireceiver/receiver.go | 40 +- .../telemetryapireceiver/receiver_test.go | 401 ++++++++++++------ 4 files changed, 295 insertions(+), 156 deletions(-) diff --git a/collector/internal/telemetryapi/listener_test.go b/collector/internal/telemetryapi/listener_test.go index 230bd919cc..b9dd48667a 100644 --- a/collector/internal/telemetryapi/listener_test.go +++ b/collector/internal/telemetryapi/listener_test.go @@ -45,11 +45,6 @@ func setupListener(t *testing.T) (*Listener, string) { address, err := listener.Start() require.NoError(t, err) - - t.Cleanup(func() { - listener.Shutdown() - }) - return listener, address } @@ -183,6 +178,7 @@ func TestListenOnAddress(t *testing.T) { func TestListener_StartAndShutdown(t *testing.T) { listener, address := setupListener(t) + defer listener.Shutdown() require.NotEqual(t, address, "", "Start() should not return an empty address") require.True(t, strings.HasPrefix(address, "http://"), "Address should start with http://") require.NotNil(t, listener.httpServer, "httpServer should not be nil") @@ -241,6 +237,7 @@ func TestListener_httpHandler(t *testing.T) { for _, test := range testCases { t.Run(test.name, func(t *testing.T) { listener, address := setupListener(t) + defer listener.Shutdown() submitEvents(t, address, test.events) require.EventuallyWithT(t, func(c *assert.CollectT) { require.Equal(c, test.expectedCount, listener.queue.Len()) @@ -302,6 +299,7 @@ func TestListener_Wait_Success(t *testing.T) { for _, test := range testCases { t.Run(test.name, func(t *testing.T) { listener, address := setupListener(t) + defer listener.Shutdown() waitDone := make(chan error, 1) go func() { diff --git a/collector/receiver/telemetryapireceiver/config.go b/collector/receiver/telemetryapireceiver/config.go index 246b8dde39..c9c5260378 100644 --- a/collector/receiver/telemetryapireceiver/config.go +++ b/collector/receiver/telemetryapireceiver/config.go @@ -23,7 +23,7 @@ type Config struct { extensionID string Port int `mapstructure:"port"` Types []string `mapstructure:"types"` - LogReport bool `mapstructure:"log_report"` + LogReport *bool `mapstructure:"log_report"` } // Validate validates the configuration by checking for missing or invalid fields diff --git a/collector/receiver/telemetryapireceiver/receiver.go b/collector/receiver/telemetryapireceiver/receiver.go index cfcb50e305..b11e12fcc3 100644 --- a/collector/receiver/telemetryapireceiver/receiver.go +++ b/collector/receiver/telemetryapireceiver/receiver.go @@ -205,6 +205,9 @@ func (r *telemetryAPIReceiver) createLogs(slice []event) (plog.Logs, error) { scopeLog.Scope().SetName(scopeName) for _, el := range slice { r.logger.Debug(fmt.Sprintf("Event: %s", el.Type), zap.Any("event", el)) + if !r.logReport && el.Type == string(telemetryapi.PlatformReport) { + continue + } logRecord := scopeLog.LogRecords().AppendEmpty() logRecord.Attributes().PutStr("type", el.Type) if t, err := time.Parse(time.RFC3339, el.Time); err == nil { @@ -215,6 +218,17 @@ func (r *telemetryAPIReceiver) createLogs(slice []event) (plog.Logs, error) { return plog.Logs{}, err } if record, ok := el.Record.(map[string]interface{}); ok { + requestId := r.getRecordRequestId(record) + if requestId != "" { + logRecord.Attributes().PutStr(semconv.AttributeFaaSInvocationID, requestId) + + // If this is the first event in the invocation with a request id (i.e. the "platform.start" event), + // set the current invocation id to this request id. + if el.Type == string(telemetryapi.PlatformStart) { + r.currentFaasInvocationID = requestId + } + } + // in JSON format https://docs.aws.amazon.com/lambda/latest/dg/telemetry-schema-reference.html#telemetry-api-function if timestamp, ok := record["timestamp"].(string); ok { if t, err := time.Parse(time.RFC3339, timestamp); err == nil { @@ -229,24 +243,13 @@ func (r *telemetryAPIReceiver) createLogs(slice []event) (plog.Logs, error) { logRecord.SetSeverityText(logRecord.SeverityNumber().String()) } - requestId := r.getRecordRequestId(record) - if requestId != "" { - logRecord.Attributes().PutStr(semconv.AttributeFaaSInvocationID, requestId) - - // If this is the first event in the invocation with a request id (i.e. the "platform.start" event), - // set the current invocation id to this request id. - if el.Type == string(telemetryapi.PlatformStart) { - r.currentFaasInvocationID = requestId - } - } - - if line, ok := record["message"].(string); ok { - logRecord.Body().SetStr(line) - } else if el.Type == string(telemetryapi.PlatformReport) { + if el.Type == string(telemetryapi.PlatformReport) { platformReportMessage := createPlatformReportMessage(requestId, record) if platformReportMessage != "" { logRecord.Body().SetStr(platformReportMessage) } + } else if line, ok := record["message"].(string); ok { + logRecord.Body().SetStr(line) } } else { if r.currentFaasInvocationID != "" { @@ -406,7 +409,7 @@ func newTelemetryAPIReceiver( } } - subscribedTypes := []telemetryapi.EventType{} + var subscribedTypes []telemetryapi.EventType for _, val := range cfg.Types { switch val { case "platform": @@ -418,6 +421,11 @@ func newTelemetryAPIReceiver( } } + logReport := true + if cfg.LogReport != nil { + logReport = *cfg.LogReport + } + return &telemetryAPIReceiver{ logger: set.Logger, queue: queue.New(initialQueueSize), @@ -425,7 +433,7 @@ func newTelemetryAPIReceiver( port: cfg.Port, types: subscribedTypes, resource: r, - logReport: cfg.LogReport, + logReport: logReport, }, nil } diff --git a/collector/receiver/telemetryapireceiver/receiver_test.go b/collector/receiver/telemetryapireceiver/receiver_test.go index 346ec2fc2b..8f7516a7e2 100644 --- a/collector/receiver/telemetryapireceiver/receiver_test.go +++ b/collector/receiver/telemetryapireceiver/receiver_test.go @@ -174,23 +174,26 @@ func TestCreatePlatformInitSpan(t *testing.T) { func TestCreateLogs(t *testing.T) { t.Parallel() + type logInfo struct { + logType string + timestamp string + body string + severityText string + containsRequestId bool + requestId string + severityNumber plog.SeverityNumber + } + testCases := []struct { - desc string - slice []event - expectedLogRecords int - expectedType string - expectedTimestamp string - expectedBody string - expectedSeverityText string - expectedContainsRequestId bool - expectedRequestId string - expectedSeverityNumber plog.SeverityNumber - expectError bool + desc string + slice []event + expectedLogs []logInfo + expectError bool }{ { - desc: "no slice", - expectedLogRecords: 0, - expectError: false, + desc: "no slice", + expectedLogs: []logInfo{}, + expectError: false, }, { desc: "Invalid Timestamp", @@ -212,14 +215,16 @@ func TestCreateLogs(t *testing.T) { Record: "[INFO] Hello world, I am an extension!", }, }, - expectedLogRecords: 1, - expectedType: "function", - expectedTimestamp: "2022-10-12T00:03:50.000Z", - expectedBody: "[INFO] Hello world, I am an extension!", - expectedContainsRequestId: false, - expectedSeverityText: "", - expectedSeverityNumber: plog.SeverityNumberUnspecified, - expectError: false, + expectedLogs: []logInfo{ + { + logType: "function", + timestamp: "2022-10-12T00:03:50.000Z", + body: "[INFO] Hello world, I am an extension!", + containsRequestId: false, + severityNumber: plog.SeverityNumberUnspecified, + }, + }, + expectError: false, }, { desc: "function text with requestId", @@ -242,15 +247,33 @@ func TestCreateLogs(t *testing.T) { Record: map[string]any{}, }, }, - expectedLogRecords: 1, - expectedType: "function", - expectedTimestamp: "2022-10-12T00:03:50.000Z", - expectedBody: "[INFO] Hello world, I am an extension!", - expectedContainsRequestId: true, - expectedRequestId: "34472c47-5ff0-4df5-a9ad-03776afa5473", - expectedSeverityText: "", - expectedSeverityNumber: plog.SeverityNumberUnspecified, - expectError: false, + expectedLogs: []logInfo{ + { + logType: "platform.start", + timestamp: "2022-10-12T00:03:50.000Z", + body: "", + containsRequestId: true, + requestId: "34472c47-5ff0-4df5-a9ad-03776afa5473", + severityNumber: plog.SeverityNumberUnspecified, + }, + { + logType: "function", + timestamp: "2022-10-12T00:03:50.000Z", + body: "[INFO] Hello world, I am an extension!", + containsRequestId: true, + requestId: "34472c47-5ff0-4df5-a9ad-03776afa5473", + severityNumber: plog.SeverityNumberUnspecified, + }, + { + logType: "platform.runtimeDone", + timestamp: "2022-10-12T00:03:50.000Z", + body: "", + containsRequestId: true, + requestId: "34472c47-5ff0-4df5-a9ad-03776afa5473", + severityNumber: plog.SeverityNumberUnspecified, + }, + }, + expectError: false, }, { desc: "function json", @@ -266,15 +289,18 @@ func TestCreateLogs(t *testing.T) { }, }, }, - expectedLogRecords: 1, - expectedType: "function", - expectedTimestamp: "2022-10-12T00:03:50.000Z", - expectedBody: "Hello world, I am a function!", - expectedContainsRequestId: true, - expectedRequestId: "79b4f56e-95b1-4643-9700-2807f4e68189", - expectedSeverityText: "Info", - expectedSeverityNumber: plog.SeverityNumberInfo, - expectError: false, + expectedLogs: []logInfo{ + { + logType: "function", + timestamp: "2022-10-12T00:03:50.000Z", + body: "Hello world, I am a function!", + containsRequestId: true, + requestId: "79b4f56e-95b1-4643-9700-2807f4e68189", + severityText: "Info", + severityNumber: plog.SeverityNumberInfo, + }, + }, + expectError: false, }, { desc: "extension text", @@ -285,14 +311,16 @@ func TestCreateLogs(t *testing.T) { Record: "[INFO] Hello world, I am an extension!", }, }, - expectedLogRecords: 1, - expectedType: "extension", - expectedTimestamp: "2022-10-12T00:03:50.000Z", - expectedBody: "[INFO] Hello world, I am an extension!", - expectedContainsRequestId: false, - expectedSeverityText: "", - expectedSeverityNumber: plog.SeverityNumberUnspecified, - expectError: false, + expectedLogs: []logInfo{ + { + logType: "extension", + timestamp: "2022-10-12T00:03:50.000Z", + body: "[INFO] Hello world, I am an extension!", + containsRequestId: false, + severityNumber: plog.SeverityNumberUnspecified, + }, + }, + expectError: false, }, { desc: "extension json", @@ -308,15 +336,18 @@ func TestCreateLogs(t *testing.T) { }, }, }, - expectedLogRecords: 1, - expectedType: "extension", - expectedTimestamp: "2022-10-12T00:03:50.000Z", - expectedBody: "Hello world, I am an extension!", - expectedContainsRequestId: true, - expectedRequestId: "79b4f56e-95b1-4643-9700-2807f4e68689", - expectedSeverityText: "Info", - expectedSeverityNumber: plog.SeverityNumberInfo, - expectError: false, + expectedLogs: []logInfo{ + { + logType: "extension", + timestamp: "2022-10-12T00:03:50.000Z", + body: "Hello world, I am an extension!", + containsRequestId: true, + requestId: "79b4f56e-95b1-4643-9700-2807f4e68689", + severityText: "Info", + severityNumber: plog.SeverityNumberInfo, + }, + }, + expectError: false, }, { desc: "extension json anything", @@ -332,18 +363,21 @@ func TestCreateLogs(t *testing.T) { }, }, }, - expectedLogRecords: 1, - expectedType: "extension", - expectedTimestamp: "2022-10-12T00:03:50.000Z", - expectedBody: "Hello world, I am an extension!", - expectedContainsRequestId: true, - expectedRequestId: "79b4f56e-95b1-4643-9700-2807f4e68689", - expectedSeverityText: "Unspecified", - expectedSeverityNumber: plog.SeverityNumberUnspecified, - expectError: false, + expectedLogs: []logInfo{ + { + logType: "extension", + timestamp: "2022-10-12T00:03:50.000Z", + body: "Hello world, I am an extension!", + containsRequestId: true, + requestId: "79b4f56e-95b1-4643-9700-2807f4e68689", + severityText: "Unspecified", + severityNumber: plog.SeverityNumberUnspecified, + }, + }, + expectError: false, }, { - desc: "platform.initStart anything", + desc: "platform.initStart", slice: []event{ { Time: "2022-10-12T00:03:50.000Z", @@ -351,11 +385,19 @@ func TestCreateLogs(t *testing.T) { Record: map[string]any{}, }, }, - expectedLogRecords: 0, - expectError: false, + expectedLogs: []logInfo{ + { + logType: "platform.initStart", + timestamp: "2022-10-12T00:03:50.000Z", + body: "", + containsRequestId: false, + severityNumber: plog.SeverityNumberUnspecified, + }, + }, + expectError: false, }, { - desc: "platform.initRuntimeDone anything", + desc: "platform.initRuntimeDone", slice: []event{ { Time: "2022-10-12T00:03:50.000Z", @@ -363,11 +405,19 @@ func TestCreateLogs(t *testing.T) { Record: map[string]any{}, }, }, - expectedLogRecords: 0, - expectError: false, + expectedLogs: []logInfo{ + { + logType: "platform.initRuntimeDone", + timestamp: "2022-10-12T00:03:50.000Z", + body: "", + containsRequestId: false, + severityNumber: plog.SeverityNumberUnspecified, + }, + }, + expectError: false, }, { - desc: "platform.initReport anything", + desc: "platform.initReport", slice: []event{ { Time: "2022-10-12T00:03:50.000Z", @@ -375,11 +425,19 @@ func TestCreateLogs(t *testing.T) { Record: map[string]any{}, }, }, - expectedLogRecords: 0, - expectError: false, + expectedLogs: []logInfo{ + { + logType: "platform.initReport", + timestamp: "2022-10-12T00:03:50.000Z", + body: "", + containsRequestId: false, + severityNumber: plog.SeverityNumberUnspecified, + }, + }, + expectError: false, }, { - desc: "platform.start anything", + desc: "platform.start", slice: []event{ { Time: "2022-10-12T00:03:50.000Z", @@ -389,11 +447,20 @@ func TestCreateLogs(t *testing.T) { }, }, }, - expectedLogRecords: 0, - expectError: false, + expectedLogs: []logInfo{ + { + logType: "platform.start", + timestamp: "2022-10-12T00:03:50.000Z", + body: "", + containsRequestId: true, + requestId: "test-id", + severityNumber: plog.SeverityNumberUnspecified, + }, + }, + expectError: false, }, { - desc: "platform.runtimeDone anything", + desc: "platform.runtimeDone", slice: []event{ { Time: "2022-10-12T00:03:50.000Z", @@ -401,11 +468,19 @@ func TestCreateLogs(t *testing.T) { Record: map[string]any{}, }, }, - expectedLogRecords: 0, - expectError: false, + expectedLogs: []logInfo{ + { + logType: "platform.runtimeDone", + timestamp: "2022-10-12T00:03:50.000Z", + body: "", + containsRequestId: false, + severityNumber: plog.SeverityNumberUnspecified, + }, + }, + expectError: false, }, { - desc: "platform.report anything", + desc: "platform.report", slice: []event{ { Time: "2022-10-12T00:03:50.000Z", @@ -413,11 +488,19 @@ func TestCreateLogs(t *testing.T) { Record: map[string]any{}, }, }, - expectedLogRecords: 0, - expectError: false, + expectedLogs: []logInfo{ + { + logType: "platform.report", + timestamp: "2022-10-12T00:03:50.000Z", + body: "", + containsRequestId: false, + severityNumber: plog.SeverityNumberUnspecified, + }, + }, + expectError: false, }, { - desc: "platform.restoreStart anything", + desc: "platform.restoreStart", slice: []event{ { Time: "2022-10-12T00:03:50.000Z", @@ -425,11 +508,19 @@ func TestCreateLogs(t *testing.T) { Record: map[string]any{}, }, }, - expectedLogRecords: 0, - expectError: false, + expectedLogs: []logInfo{ + { + logType: "platform.restoreStart", + timestamp: "2022-10-12T00:03:50.000Z", + body: "", + containsRequestId: false, + severityNumber: plog.SeverityNumberUnspecified, + }, + }, + expectError: false, }, { - desc: "platform.restoreRuntimeDone anything", + desc: "platform.restoreRuntimeDone", slice: []event{ { Time: "2022-10-12T00:03:50.000Z", @@ -437,23 +528,39 @@ func TestCreateLogs(t *testing.T) { Record: map[string]any{}, }, }, - expectedLogRecords: 0, - expectError: false, + expectedLogs: []logInfo{ + { + logType: "platform.restoreRuntimeDone", + timestamp: "2022-10-12T00:03:50.000Z", + body: "", + containsRequestId: false, + severityNumber: plog.SeverityNumberUnspecified, + }, + }, + expectError: false, }, { - desc: "platform.restoreReport anything", + desc: "platform.restoreReport", slice: []event{ { Time: "2022-10-12T00:03:50.000Z", - Type: "platform.restoreStart", + Type: "platform.restoreReport", Record: map[string]any{}, }, }, - expectedLogRecords: 0, - expectError: false, + expectedLogs: []logInfo{ + { + logType: "platform.restoreReport", + timestamp: "2022-10-12T00:03:50.000Z", + body: "", + containsRequestId: false, + severityNumber: plog.SeverityNumberUnspecified, + }, + }, + expectError: false, }, { - desc: "platform.telemetrySubscription anything", + desc: "platform.telemetrySubscription", slice: []event{ { Time: "2022-10-12T00:03:50.000Z", @@ -461,11 +568,19 @@ func TestCreateLogs(t *testing.T) { Record: map[string]any{}, }, }, - expectedLogRecords: 0, - expectError: false, + expectedLogs: []logInfo{ + { + logType: "platform.telemetrySubscription", + timestamp: "2022-10-12T00:03:50.000Z", + body: "", + containsRequestId: false, + severityNumber: plog.SeverityNumberUnspecified, + }, + }, + expectError: false, }, { - desc: "platform.logsDropped anything", + desc: "platform.logsDropped", slice: []event{ { Time: "2022-10-12T00:03:50.000Z", @@ -473,10 +588,19 @@ func TestCreateLogs(t *testing.T) { Record: map[string]any{}, }, }, - expectedLogRecords: 0, - expectError: false, + expectedLogs: []logInfo{ + { + logType: "platform.logsDropped", + timestamp: "2022-10-12T00:03:50.000Z", + body: "", + containsRequestId: false, + severityNumber: plog.SeverityNumberUnspecified, + }, + }, + expectError: false, }, } + for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { r, err := newTelemetryAPIReceiver( @@ -487,30 +611,36 @@ func TestCreateLogs(t *testing.T) { log, err := r.createLogs(tc.slice) if tc.expectError { require.Error(t, err) - } else { - require.Equal(t, 1, log.ResourceLogs().Len()) - resourceLog := log.ResourceLogs().At(0) - require.Equal(t, 1, resourceLog.ScopeLogs().Len()) - scopeLog := resourceLog.ScopeLogs().At(0) - require.Equal(t, scopeName, scopeLog.Scope().Name()) - require.Equal(t, tc.expectedLogRecords, scopeLog.LogRecords().Len()) - if scopeLog.LogRecords().Len() > 0 { - logRecord := scopeLog.LogRecords().At(0) - attr, ok := logRecord.Attributes().Get("type") - require.True(t, ok) - require.Equal(t, tc.expectedType, attr.Str()) - expectedTime, err := time.Parse(time.RFC3339, tc.expectedTimestamp) - require.NoError(t, err) - require.Equal(t, pcommon.NewTimestampFromTime(expectedTime), logRecord.Timestamp()) - requestId, ok := logRecord.Attributes().Get(semconv.AttributeFaaSInvocationID) - require.Equal(t, tc.expectedContainsRequestId, ok) - if ok { - require.Equal(t, tc.expectedRequestId, requestId.Str()) - } - require.Equal(t, tc.expectedSeverityText, logRecord.SeverityText()) - require.Equal(t, tc.expectedSeverityNumber, logRecord.SeverityNumber()) - require.Equal(t, tc.expectedBody, logRecord.Body().Str()) + return + } + require.NoError(t, err) + require.Equal(t, 1, log.ResourceLogs().Len()) + resourceLog := log.ResourceLogs().At(0) + require.Equal(t, 1, resourceLog.ScopeLogs().Len()) + scopeLog := resourceLog.ScopeLogs().At(0) + require.Equal(t, scopeName, scopeLog.Scope().Name()) + require.Equal(t, len(tc.expectedLogs), scopeLog.LogRecords().Len()) + + for i, expected := range tc.expectedLogs { + logRecord := scopeLog.LogRecords().At(i) + + attr, ok := logRecord.Attributes().Get("type") + require.True(t, ok) + require.Equal(t, expected.logType, attr.Str()) + + expectedTime, err := time.Parse(time.RFC3339, expected.timestamp) + require.NoError(t, err) + require.Equal(t, pcommon.NewTimestampFromTime(expectedTime), logRecord.Timestamp()) + + requestId, ok := logRecord.Attributes().Get(semconv.AttributeFaaSInvocationID) + require.Equal(t, expected.containsRequestId, ok) + if ok { + require.Equal(t, expected.requestId, requestId.Str()) } + + require.Equal(t, expected.severityText, logRecord.SeverityText()) + require.Equal(t, expected.severityNumber, logRecord.SeverityNumber()) + require.Equal(t, expected.body, logRecord.Body().Str()) } }) } @@ -520,15 +650,15 @@ func TestCreateLogsWithLogReport(t *testing.T) { t.Parallel() testCases := []struct { - desc string - slice []event - logReport bool - expectedLogRecords int - expectedType string - expectedTimestamp string - expectedBody string - expectedAttributes map[string]interface{} - expectError bool + desc string + slice []event + logReport bool + expectedLogRecords int + expectedType string + expectedTimestamp string + expectedBody string + expectedAttributes map[string]interface{} + expectError bool }{ { desc: "platform.report with logReport enabled - valid metrics", @@ -662,8 +792,11 @@ func TestCreateLogsWithLogReport(t *testing.T) { }, }, logReport: true, - expectedLogRecords: 0, + expectedLogRecords: 1, expectError: false, + expectedType: "platform.report", + expectedTimestamp: "2022-10-12T00:03:50.000Z", + expectedBody: "invalid record format", }, { desc: "platform.report with logReport enabled - with initDurationMs", @@ -719,7 +852,7 @@ func TestCreateLogsWithLogReport(t *testing.T) { for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { r, err := newTelemetryAPIReceiver( - &Config{LogReport: tc.logReport}, + &Config{LogReport: &tc.logReport}, receivertest.NewNopSettings(Type), ) require.NoError(t, err) From d3eabefb19da559f1be34cdb05a6d058385b4949 Mon Sep 17 00:00:00 2001 From: Lukas Hering Date: Mon, 8 Dec 2025 15:33:15 -0500 Subject: [PATCH 04/12] lint project --- collector/lambdacomponents/receiver/telemetryapi.go | 3 ++- collector/receiver/telemetryapireceiver/receiver.go | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/collector/lambdacomponents/receiver/telemetryapi.go b/collector/lambdacomponents/receiver/telemetryapi.go index d0ffe066fd..ed01a8daf1 100644 --- a/collector/lambdacomponents/receiver/telemetryapi.go +++ b/collector/lambdacomponents/receiver/telemetryapi.go @@ -17,8 +17,9 @@ package receiver import ( - "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapireceiver" "go.opentelemetry.io/collector/receiver" + + "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapireceiver" ) func init() { diff --git a/collector/receiver/telemetryapireceiver/receiver.go b/collector/receiver/telemetryapireceiver/receiver.go index b11e12fcc3..a8e73cef4c 100644 --- a/collector/receiver/telemetryapireceiver/receiver.go +++ b/collector/receiver/telemetryapireceiver/receiver.go @@ -204,10 +204,10 @@ func (r *telemetryAPIReceiver) createLogs(slice []event) (plog.Logs, error) { scopeLog := resourceLog.ScopeLogs().AppendEmpty() scopeLog.Scope().SetName(scopeName) for _, el := range slice { - r.logger.Debug(fmt.Sprintf("Event: %s", el.Type), zap.Any("event", el)) if !r.logReport && el.Type == string(telemetryapi.PlatformReport) { continue } + r.logger.Debug(fmt.Sprintf("Event: %s", el.Type), zap.Any("event", el)) logRecord := scopeLog.LogRecords().AppendEmpty() logRecord.Attributes().PutStr("type", el.Type) if t, err := time.Parse(time.RFC3339, el.Time); err == nil { From 1484bd14f69ee05e99a43c858e6b185fd199e02a Mon Sep 17 00:00:00 2001 From: Lukas Hering Date: Mon, 8 Dec 2025 18:10:16 -0500 Subject: [PATCH 05/12] update logging --- collector/internal/telemetryapi/types.go | 24 +- .../receiver/telemetryapireceiver/receiver.go | 113 +++++- .../telemetryapireceiver/receiver_test.go | 381 ++++++++++++++++++ 3 files changed, 505 insertions(+), 13 deletions(-) diff --git a/collector/internal/telemetryapi/types.go b/collector/internal/telemetryapi/types.go index 1cd4583c9b..d5bd0fe427 100644 --- a/collector/internal/telemetryapi/types.go +++ b/collector/internal/telemetryapi/types.go @@ -24,15 +24,29 @@ const ( PlatformInitStart EventType = Platform + ".initStart" // PlatformInitRuntimeDone is used when function initialization ended. PlatformInitRuntimeDone EventType = Platform + ".initRuntimeDone" - // PlatformReport is used when a report of function invocation is received. - PlatformReport EventType = Platform + ".report" - // Function invocation started. + // PlatformInitReport is used when a report of function initialization is received. + PlatformInitReport EventType = Platform + ".initReport" + // PlatformStart is used when function invocation started. PlatformStart EventType = Platform + ".start" - // The runtime finished processing an event with either success or failure. + // PlatformRuntimeDone is used when the runtime finished processing an event with either success or failure. PlatformRuntimeDone EventType = Platform + ".runtimeDone" + // PlatformReport is used when a report of function invocation is received. + PlatformReport EventType = Platform + ".report" + // PlatformRestoreStart is used when runtime restore started. + PlatformRestoreStart EventType = Platform + ".restoreStart" + // PlatformRestoreRuntimeDone is used when runtime restore completed. + PlatformRestoreRuntimeDone EventType = Platform + ".restoreRuntimeDone" + // PlatformRestoreReport is used when a report of runtime restore is received. + PlatformRestoreReport EventType = Platform + ".restoreReport" + // PlatformExtension is used for extension state events. + PlatformExtension EventType = Platform + ".extension" + // PlatformTelemetrySubscription is used when the extension subscribed to the Telemetry API. + PlatformTelemetrySubscription EventType = Platform + ".telemetrySubscription" + // PlatformLogsDropped is used when Lambda dropped log entries. + PlatformLogsDropped EventType = Platform + ".logsDropped" // Function is used to receive log events emitted by the function Function EventType = "function" - // Extension is used is to receive log events emitted by the extension + // Extension is used to receive log events emitted by the extension Extension EventType = "extension" ) diff --git a/collector/receiver/telemetryapireceiver/receiver.go b/collector/receiver/telemetryapireceiver/receiver.go index a8e73cef4c..4c2307d39c 100644 --- a/collector/receiver/telemetryapireceiver/receiver.go +++ b/collector/receiver/telemetryapireceiver/receiver.go @@ -42,9 +42,20 @@ import ( ) const ( - initialQueueSize = 5 - scopeName = "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapi" - logReportFmt = "REPORT RequestId: %s Duration: %.2f ms Billed Duration: %.0f ms Memory Size: %.0f MB Max Memory Used: %.0f MB" + initialQueueSize = 5 + scopeName = "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapi" + platformReportLogFmt = "REPORT RequestId: %s Duration: %.2f ms Billed Duration: %.0f ms Memory Size: %.0f MB Max Memory Used: %.0f MB" + platformStartLogFmt = "START RequestId: %s Version: %s" + platformRuntimeDoneLogFmt = "END RequestId: %s Version: %s" + platformInitStartLogFmt = "INIT_START Runtime Version: %s Runtime Version ARN: %s" + platformInitRuntimeDoneLogFmt = "INIT_RUNTIME_DONE Status: %s" + platformInitReportLogFmt = "INIT_REPORT Initialization Type: %s Phase: %s Status: %s Duration: %.2f ms" + platformRestoreStartLogFmt = "RESTORE_START Runtime Version: %s Runtime Version ARN: %s" + platformRestoreRuntimeDoneLogFmt = "RESTORE_RUNTIME_DONE Status: %s" + platformRestoreReportLogFmt = "RESTORE_REPORT Status: %s Duration: %.2f ms" + platformTelemetrySubscriptionLogFmt = "TELEMETRY: %s Subscribed Types: %v" + platformExtensionLogFmt = "EXTENSION Name: %s State: %s Events: %v" + platformLogsDroppedLogFmt = "LOGS_DROPPED DroppedRecords: %d DroppedBytes: %d Reason: %s" ) type telemetryAPIReceiver struct { @@ -59,6 +70,7 @@ type telemetryAPIReceiver struct { port int types []telemetryapi.EventType resource pcommon.Resource + faasFunctionVersion string currentFaasInvocationID string logReport bool } @@ -243,10 +255,17 @@ func (r *telemetryAPIReceiver) createLogs(slice []event) (plog.Logs, error) { logRecord.SetSeverityText(logRecord.SeverityNumber().String()) } - if el.Type == string(telemetryapi.PlatformReport) { - platformReportMessage := createPlatformReportMessage(requestId, record) - if platformReportMessage != "" { - logRecord.Body().SetStr(platformReportMessage) + if strings.HasPrefix(el.Type, platform) { + if el.Type == string(telemetryapi.PlatformInitStart) { + functionVersion, _ := record["functionVersion"].(string) + if functionVersion != "" { + r.faasFunctionVersion = functionVersion + } + } + + message := createPlatformMessage(requestId, r.faasFunctionVersion, el.Type, record) + if message != "" { + logRecord.Body().SetStr(message) } } else if line, ok := record["message"].(string); ok { logRecord.Body().SetStr(line) @@ -267,6 +286,84 @@ func (r *telemetryAPIReceiver) createLogs(slice []event) (plog.Logs, error) { return log, nil } +func createPlatformMessage(requestId string, functionVersion string, eventType string, record map[string]interface{}) string { + switch eventType { + case string(telemetryapi.PlatformStart): + if requestId != "" && functionVersion != "" { + return fmt.Sprintf(platformStartLogFmt, requestId, functionVersion) + } + case string(telemetryapi.PlatformRuntimeDone): + if requestId != "" && functionVersion != "" { + return fmt.Sprintf(platformRuntimeDoneLogFmt, requestId, functionVersion) + } + case string(telemetryapi.PlatformReport): + return createPlatformReportMessage(requestId, record) + case string(telemetryapi.PlatformInitStart): + runtimeVersion, _ := record["runtimeVersion"].(string) + runtimeVersionArn, _ := record["runtimeVersionArn"].(string) + if runtimeVersion != "" || runtimeVersionArn != "" { + return fmt.Sprintf(platformInitStartLogFmt, runtimeVersion, runtimeVersionArn) + } + case string(telemetryapi.PlatformInitRuntimeDone): + status, _ := record["status"].(string) + if status != "" { + return fmt.Sprintf(platformInitRuntimeDoneLogFmt, status) + } + case string(telemetryapi.PlatformInitReport): + initType, _ := record["initializationType"].(string) + phase, _ := record["phase"].(string) + status, _ := record["status"].(string) + var durationMs float64 + if metrics, ok := record["metrics"].(map[string]interface{}); ok { + durationMs, _ = metrics["durationMs"].(float64) + } + if initType != "" || phase != "" || status != "" || durationMs != 0 { + return fmt.Sprintf(platformInitReportLogFmt, initType, phase, status, durationMs) + } + case string(telemetryapi.PlatformRestoreStart): + runtimeVersion, _ := record["runtimeVersion"].(string) + runtimeVersionArn, _ := record["runtimeVersionArn"].(string) + if runtimeVersion != "" || runtimeVersionArn != "" { + return fmt.Sprintf(platformRestoreStartLogFmt, runtimeVersion, runtimeVersionArn) + } + case string(telemetryapi.PlatformRestoreRuntimeDone): + status, _ := record["status"].(string) + if status != "" { + return fmt.Sprintf(platformRestoreRuntimeDoneLogFmt, status) + } + case string(telemetryapi.PlatformRestoreReport): + status, _ := record["status"].(string) + var durationMs float64 + if metrics, ok := record["metrics"].(map[string]interface{}); ok { + durationMs, _ = metrics["durationMs"].(float64) + } + if status != "" && durationMs != 0 { + return fmt.Sprintf(platformRestoreReportLogFmt, status, durationMs) + } + case string(telemetryapi.PlatformTelemetrySubscription): + name, _ := record["name"].(string) + types, _ := record["types"].([]interface{}) + if name != "" { + return fmt.Sprintf(platformTelemetrySubscriptionLogFmt, name, types) + } + case string(telemetryapi.PlatformExtension): + name, _ := record["name"].(string) + state, _ := record["state"].(string) + events, _ := record["events"].([]interface{}) + if name != "" { + return fmt.Sprintf(platformExtensionLogFmt, name, state, events) + } + case string(telemetryapi.PlatformLogsDropped): + droppedRecords, _ := record["droppedRecords"].(int64) + droppedBytes, _ := record["droppedBytes"].(int64) + reason, _ := record["reason"].(string) + if reason != "" { + return fmt.Sprintf(platformLogsDroppedLogFmt, int(droppedRecords), int(droppedBytes), reason) + } + } + return "" +} + func createPlatformReportMessage(requestId string, record map[string]interface{}) string { // gathering metrics metrics, ok := record["metrics"].(map[string]interface{}) @@ -296,7 +393,7 @@ func createPlatformReportMessage(requestId string, record map[string]interface{} } message := fmt.Sprintf( - logReportFmt, + platformReportLogFmt, requestId, durationMs, billedDurationMs, diff --git a/collector/receiver/telemetryapireceiver/receiver_test.go b/collector/receiver/telemetryapireceiver/receiver_test.go index 8f7516a7e2..ca53a47bdc 100644 --- a/collector/receiver/telemetryapireceiver/receiver_test.go +++ b/collector/receiver/telemetryapireceiver/receiver_test.go @@ -887,6 +887,387 @@ func TestCreateLogsWithLogReport(t *testing.T) { } } +func TestCreatePlatformMessage(t *testing.T) { + t.Parallel() + + testCases := []struct { + desc string + requestId string + functionVersion string + eventType string + record map[string]interface{} + expected string + }{ + { + desc: "platform.start with requestId and functionVersion", + requestId: "test-request-id", + functionVersion: "$LATEST", + eventType: "platform.start", + record: map[string]interface{}{}, + expected: "START RequestId: test-request-id Version: $LATEST", + }, + { + desc: "platform.start with empty requestId", + requestId: "", + functionVersion: "$LATEST", + eventType: "platform.start", + record: map[string]interface{}{}, + expected: "", + }, + { + desc: "platform.start with empty functionVersion", + requestId: "test-request-id", + functionVersion: "", + eventType: "platform.start", + record: map[string]interface{}{}, + expected: "", + }, + { + desc: "platform.runtimeDone with requestId and functionVersion", + requestId: "test-request-id", + functionVersion: "v1.0.0", + eventType: "platform.runtimeDone", + record: map[string]interface{}{}, + expected: "END RequestId: test-request-id Version: v1.0.0", + }, + { + desc: "platform.runtimeDone with empty requestId", + requestId: "", + functionVersion: "v1.0.0", + eventType: "platform.runtimeDone", + record: map[string]interface{}{}, + expected: "", + }, + { + desc: "platform.runtimeDone with empty functionVersion", + requestId: "test-request-id", + functionVersion: "", + eventType: "platform.runtimeDone", + record: map[string]interface{}{}, + expected: "", + }, + { + desc: "platform.report with valid metrics", + requestId: "test-request-id", + functionVersion: "$LATEST", + eventType: "platform.report", + record: map[string]interface{}{ + "metrics": map[string]interface{}{ + "durationMs": 100.5, + "billedDurationMs": 101.0, + "memorySizeMB": 128.0, + "maxMemoryUsedMB": 64.0, + }, + }, + expected: "REPORT RequestId: test-request-id Duration: 100.50 ms Billed Duration: 101 ms Memory Size: 128 MB Max Memory Used: 64 MB", + }, + { + desc: "platform.report with missing metrics", + requestId: "test-request-id", + functionVersion: "$LATEST", + eventType: "platform.report", + record: map[string]interface{}{}, + expected: "", + }, + { + desc: "platform.initStart with runtimeVersion and runtimeVersionArn", + requestId: "", + functionVersion: "", + eventType: "platform.initStart", + record: map[string]interface{}{ + "runtimeVersion": "python:3.9", + "runtimeVersionArn": "arn:aws:lambda:us-east-1::runtime:python:3.9", + }, + expected: "INIT_START Runtime Version: python:3.9 Runtime Version ARN: arn:aws:lambda:us-east-1::runtime:python:3.9", + }, + { + desc: "platform.initStart with only runtimeVersion", + requestId: "", + functionVersion: "", + eventType: "platform.initStart", + record: map[string]interface{}{ + "runtimeVersion": "nodejs:18", + }, + expected: "INIT_START Runtime Version: nodejs:18 Runtime Version ARN: ", + }, + { + desc: "platform.initStart with only runtimeVersionArn", + requestId: "", + functionVersion: "", + eventType: "platform.initStart", + record: map[string]interface{}{ + "runtimeVersionArn": "arn:aws:lambda:us-east-1::runtime:go:1.x", + }, + expected: "INIT_START Runtime Version: Runtime Version ARN: arn:aws:lambda:us-east-1::runtime:go:1.x", + }, + { + desc: "platform.initStart with empty record", + requestId: "", + functionVersion: "", + eventType: "platform.initStart", + record: map[string]interface{}{}, + expected: "", + }, + { + desc: "platform.initRuntimeDone with status success", + requestId: "", + functionVersion: "", + eventType: "platform.initRuntimeDone", + record: map[string]interface{}{ + "status": "success", + }, + expected: "INIT_RUNTIME_DONE Status: success", + }, + { + desc: "platform.initRuntimeDone with status failure", + requestId: "", + functionVersion: "", + eventType: "platform.initRuntimeDone", + record: map[string]interface{}{ + "status": "failure", + }, + expected: "INIT_RUNTIME_DONE Status: failure", + }, + { + desc: "platform.initRuntimeDone with empty status", + requestId: "", + functionVersion: "", + eventType: "platform.initRuntimeDone", + record: map[string]interface{}{ + "status": "", + }, + expected: "", + }, + { + desc: "platform.initRuntimeDone with missing status", + requestId: "", + functionVersion: "", + eventType: "platform.initRuntimeDone", + record: map[string]interface{}{}, + expected: "", + }, + { + desc: "platform.initReport with all fields", + requestId: "", + functionVersion: "", + eventType: "platform.initReport", + record: map[string]interface{}{ + "initializationType": "on-demand", + "phase": "init", + "status": "success", + "metrics": map[string]interface{}{ + "durationMs": 250.75, + }, + }, + expected: "INIT_REPORT Initialization Type: on-demand Phase: init Status: success Duration: 250.75 ms", + }, + { + desc: "platform.initReport with provisioned-concurrency", + requestId: "", + functionVersion: "", + eventType: "platform.initReport", + record: map[string]interface{}{ + "initializationType": "provisioned-concurrency", + "phase": "init", + "status": "success", + "metrics": map[string]interface{}{ + "durationMs": 100.0, + }, + }, + expected: "INIT_REPORT Initialization Type: provisioned-concurrency Phase: init Status: success Duration: 100.00 ms", + }, + { + desc: "platform.initReport with empty record", + requestId: "", + functionVersion: "", + eventType: "platform.initReport", + record: map[string]interface{}{}, + expected: "", + }, + { + desc: "platform.initReport with only initType", + requestId: "", + functionVersion: "", + eventType: "platform.initReport", + record: map[string]interface{}{ + "initializationType": "on-demand", + }, + expected: "INIT_REPORT Initialization Type: on-demand Phase: Status: Duration: 0.00 ms", + }, + { + desc: "platform.restoreStart with runtimeVersion and runtimeVersionArn", + requestId: "", + functionVersion: "", + eventType: "platform.restoreStart", + record: map[string]interface{}{ + "runtimeVersion": "python:3.9", + "runtimeVersionArn": "arn:aws:lambda:us-east-1::runtime:python:3.9", + }, + expected: "RESTORE_START Runtime Version: python:3.9 Runtime Version ARN: arn:aws:lambda:us-east-1::runtime:python:3.9", + }, + { + desc: "platform.restoreStart with empty record", + requestId: "", + functionVersion: "", + eventType: "platform.restoreStart", + record: map[string]interface{}{}, + expected: "", + }, + { + desc: "platform.restoreRuntimeDone with status", + requestId: "", + functionVersion: "", + eventType: "platform.restoreRuntimeDone", + record: map[string]interface{}{ + "status": "success", + }, + expected: "RESTORE_RUNTIME_DONE Status: success", + }, + { + desc: "platform.restoreRuntimeDone with empty status", + requestId: "", + functionVersion: "", + eventType: "platform.restoreRuntimeDone", + record: map[string]interface{}{ + "status": "", + }, + expected: "", + }, + { + desc: "platform.restoreReport with status and duration", + requestId: "", + functionVersion: "", + eventType: "platform.restoreReport", + record: map[string]interface{}{ + "status": "success", + "metrics": map[string]interface{}{ + "durationMs": 50.25, + }, + }, + expected: "RESTORE_REPORT Status: success Duration: 50.25 ms", + }, + { + desc: "platform.restoreReport with empty status", + requestId: "", + functionVersion: "", + eventType: "platform.restoreReport", + record: map[string]interface{}{ + "status": "", + "metrics": map[string]interface{}{ + "durationMs": 50.25, + }, + }, + expected: "", + }, + { + desc: "platform.restoreReport with zero duration", + requestId: "", + functionVersion: "", + eventType: "platform.restoreReport", + record: map[string]interface{}{ + "status": "success", + "metrics": map[string]interface{}{ + "durationMs": 0.0, + }, + }, + expected: "", + }, + { + desc: "platform.telemetrySubscription with name and types", + requestId: "", + functionVersion: "", + eventType: "platform.telemetrySubscription", + record: map[string]interface{}{ + "name": "my-extension", + "types": []interface{}{"platform", "function"}, + }, + expected: "TELEMETRY: my-extension Subscribed Types: [platform function]", + }, + { + desc: "platform.telemetrySubscription with empty name", + requestId: "", + functionVersion: "", + eventType: "platform.telemetrySubscription", + record: map[string]interface{}{ + "name": "", + "types": []interface{}{"platform"}, + }, + expected: "", + }, + { + desc: "platform.extension with all fields", + requestId: "", + functionVersion: "", + eventType: "platform.extension", + record: map[string]interface{}{ + "name": "my-extension", + "state": "Ready", + "events": []interface{}{"INVOKE", "SHUTDOWN"}, + }, + expected: "EXTENSION Name: my-extension State: Ready Events: [INVOKE SHUTDOWN]", + }, + { + desc: "platform.extension with empty name", + requestId: "", + functionVersion: "", + eventType: "platform.extension", + record: map[string]interface{}{ + "name": "", + "state": "Ready", + "events": []interface{}{"INVOKE"}, + }, + expected: "", + }, + { + desc: "platform.logsDropped with all fields", + requestId: "", + functionVersion: "", + eventType: "platform.logsDropped", + record: map[string]interface{}{ + "droppedRecords": int64(10), + "droppedBytes": int64(1024), + "reason": "Consumer is too slow", + }, + expected: "LOGS_DROPPED DroppedRecords: 10 DroppedBytes: 1024 Reason: Consumer is too slow", + }, + { + desc: "platform.logsDropped with empty reason", + requestId: "", + functionVersion: "", + eventType: "platform.logsDropped", + record: map[string]interface{}{ + "droppedRecords": int64(10), + "droppedBytes": int64(1024), + "reason": "", + }, + expected: "", + }, + { + desc: "unknown event type", + requestId: "test-id", + functionVersion: "v1", + eventType: "platform.unknown", + record: map[string]interface{}{}, + expected: "", + }, + { + desc: "function event type", + requestId: "test-id", + functionVersion: "v1", + eventType: "function", + record: map[string]interface{}{}, + expected: "", + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + result := createPlatformMessage(tc.requestId, tc.functionVersion, tc.eventType, tc.record) + require.Equal(t, tc.expected, result) + }) + } +} + func TestSeverityTextToNumber(t *testing.T) { t.Parallel() From 0c425257307af02cecd2d5650b2f51c5887d3e15 Mon Sep 17 00:00:00 2001 From: Lukas Hering Date: Mon, 8 Dec 2025 18:16:36 -0500 Subject: [PATCH 06/12] update dropped records integer type --- collector/receiver/telemetryapireceiver/receiver.go | 4 ++-- collector/receiver/telemetryapireceiver/receiver_test.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/collector/receiver/telemetryapireceiver/receiver.go b/collector/receiver/telemetryapireceiver/receiver.go index 4c2307d39c..150c5de07d 100644 --- a/collector/receiver/telemetryapireceiver/receiver.go +++ b/collector/receiver/telemetryapireceiver/receiver.go @@ -354,8 +354,8 @@ func createPlatformMessage(requestId string, functionVersion string, eventType s return fmt.Sprintf(platformExtensionLogFmt, name, state, events) } case string(telemetryapi.PlatformLogsDropped): - droppedRecords, _ := record["droppedRecords"].(int64) - droppedBytes, _ := record["droppedBytes"].(int64) + droppedRecords, _ := record["droppedRecords"].(int32) + droppedBytes, _ := record["droppedBytes"].(int32) reason, _ := record["reason"].(string) if reason != "" { return fmt.Sprintf(platformLogsDroppedLogFmt, int(droppedRecords), int(droppedBytes), reason) diff --git a/collector/receiver/telemetryapireceiver/receiver_test.go b/collector/receiver/telemetryapireceiver/receiver_test.go index ca53a47bdc..652c4e7597 100644 --- a/collector/receiver/telemetryapireceiver/receiver_test.go +++ b/collector/receiver/telemetryapireceiver/receiver_test.go @@ -1236,8 +1236,8 @@ func TestCreatePlatformMessage(t *testing.T) { functionVersion: "", eventType: "platform.logsDropped", record: map[string]interface{}{ - "droppedRecords": int64(10), - "droppedBytes": int64(1024), + "droppedRecords": int32(10), + "droppedBytes": int32(1024), "reason": "", }, expected: "", From b845057839645b4b68920316d60d56d43f6a5cee Mon Sep 17 00:00:00 2001 From: Lukas Hering Date: Mon, 8 Dec 2025 18:21:23 -0500 Subject: [PATCH 07/12] Update receiver types back to float64 --- collector/receiver/telemetryapireceiver/receiver.go | 4 ++-- collector/receiver/telemetryapireceiver/receiver_test.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/collector/receiver/telemetryapireceiver/receiver.go b/collector/receiver/telemetryapireceiver/receiver.go index 150c5de07d..674faf02cd 100644 --- a/collector/receiver/telemetryapireceiver/receiver.go +++ b/collector/receiver/telemetryapireceiver/receiver.go @@ -354,8 +354,8 @@ func createPlatformMessage(requestId string, functionVersion string, eventType s return fmt.Sprintf(platformExtensionLogFmt, name, state, events) } case string(telemetryapi.PlatformLogsDropped): - droppedRecords, _ := record["droppedRecords"].(int32) - droppedBytes, _ := record["droppedBytes"].(int32) + droppedRecords, _ := record["droppedRecords"].(float64) + droppedBytes, _ := record["droppedBytes"].(float64) reason, _ := record["reason"].(string) if reason != "" { return fmt.Sprintf(platformLogsDroppedLogFmt, int(droppedRecords), int(droppedBytes), reason) diff --git a/collector/receiver/telemetryapireceiver/receiver_test.go b/collector/receiver/telemetryapireceiver/receiver_test.go index 652c4e7597..bcbb4f5c03 100644 --- a/collector/receiver/telemetryapireceiver/receiver_test.go +++ b/collector/receiver/telemetryapireceiver/receiver_test.go @@ -1236,8 +1236,8 @@ func TestCreatePlatformMessage(t *testing.T) { functionVersion: "", eventType: "platform.logsDropped", record: map[string]interface{}{ - "droppedRecords": int32(10), - "droppedBytes": int32(1024), + "droppedRecords": float64(10), + "droppedBytes": float64(1024), "reason": "", }, expected: "", From 0d5d01449be814b6f4f79395c82553a837035e4e Mon Sep 17 00:00:00 2001 From: Lukas Hering Date: Mon, 8 Dec 2025 18:35:09 -0500 Subject: [PATCH 08/12] fix failing test --- collector/receiver/telemetryapireceiver/receiver_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/collector/receiver/telemetryapireceiver/receiver_test.go b/collector/receiver/telemetryapireceiver/receiver_test.go index bcbb4f5c03..37061b29e6 100644 --- a/collector/receiver/telemetryapireceiver/receiver_test.go +++ b/collector/receiver/telemetryapireceiver/receiver_test.go @@ -1224,8 +1224,8 @@ func TestCreatePlatformMessage(t *testing.T) { functionVersion: "", eventType: "platform.logsDropped", record: map[string]interface{}{ - "droppedRecords": int64(10), - "droppedBytes": int64(1024), + "droppedRecords": float64(10), + "droppedBytes": float64(1024), "reason": "Consumer is too slow", }, expected: "LOGS_DROPPED DroppedRecords: 10 DroppedBytes: 1024 Reason: Consumer is too slow", From cfe45dd6983664f2cbda2f61634f2151cf71459b Mon Sep 17 00:00:00 2001 From: Lukas Hering Date: Wed, 17 Dec 2025 16:50:10 -0500 Subject: [PATCH 09/12] revert changes to make LogReport default to true --- collector/internal/telemetryapi/listener_test.go | 1 - collector/receiver/telemetryapireceiver/config.go | 2 +- collector/receiver/telemetryapireceiver/receiver.go | 7 +------ collector/receiver/telemetryapireceiver/receiver_test.go | 2 +- 4 files changed, 3 insertions(+), 9 deletions(-) diff --git a/collector/internal/telemetryapi/listener_test.go b/collector/internal/telemetryapi/listener_test.go index b9dd48667a..2ddcaaacfb 100644 --- a/collector/internal/telemetryapi/listener_test.go +++ b/collector/internal/telemetryapi/listener_test.go @@ -189,7 +189,6 @@ func TestListener_StartAndShutdown(t *testing.T) { } else { require.NoError(t, resp.Body.Close()) } - listener.Shutdown() require.Nil(t, listener.httpServer, "httpServer should be nil after Shutdown()") } diff --git a/collector/receiver/telemetryapireceiver/config.go b/collector/receiver/telemetryapireceiver/config.go index c9c5260378..246b8dde39 100644 --- a/collector/receiver/telemetryapireceiver/config.go +++ b/collector/receiver/telemetryapireceiver/config.go @@ -23,7 +23,7 @@ type Config struct { extensionID string Port int `mapstructure:"port"` Types []string `mapstructure:"types"` - LogReport *bool `mapstructure:"log_report"` + LogReport bool `mapstructure:"log_report"` } // Validate validates the configuration by checking for missing or invalid fields diff --git a/collector/receiver/telemetryapireceiver/receiver.go b/collector/receiver/telemetryapireceiver/receiver.go index 674faf02cd..be49616bd2 100644 --- a/collector/receiver/telemetryapireceiver/receiver.go +++ b/collector/receiver/telemetryapireceiver/receiver.go @@ -518,11 +518,6 @@ func newTelemetryAPIReceiver( } } - logReport := true - if cfg.LogReport != nil { - logReport = *cfg.LogReport - } - return &telemetryAPIReceiver{ logger: set.Logger, queue: queue.New(initialQueueSize), @@ -530,7 +525,7 @@ func newTelemetryAPIReceiver( port: cfg.Port, types: subscribedTypes, resource: r, - logReport: logReport, + logReport: cfg.LogReport, }, nil } diff --git a/collector/receiver/telemetryapireceiver/receiver_test.go b/collector/receiver/telemetryapireceiver/receiver_test.go index 37061b29e6..e4fde5a66f 100644 --- a/collector/receiver/telemetryapireceiver/receiver_test.go +++ b/collector/receiver/telemetryapireceiver/receiver_test.go @@ -852,7 +852,7 @@ func TestCreateLogsWithLogReport(t *testing.T) { for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { r, err := newTelemetryAPIReceiver( - &Config{LogReport: &tc.logReport}, + &Config{LogReport: tc.logReport}, receivertest.NewNopSettings(Type), ) require.NoError(t, err) From ec82b6f19af11b390c474ca653b318e6a389a3b2 Mon Sep 17 00:00:00 2001 From: Lukas Hering Date: Wed, 17 Dec 2025 18:04:27 -0500 Subject: [PATCH 10/12] fix failing tesT --- collector/internal/telemetryapi/listener_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/collector/internal/telemetryapi/listener_test.go b/collector/internal/telemetryapi/listener_test.go index 2ddcaaacfb..74d6c7ff31 100644 --- a/collector/internal/telemetryapi/listener_test.go +++ b/collector/internal/telemetryapi/listener_test.go @@ -178,7 +178,6 @@ func TestListenOnAddress(t *testing.T) { func TestListener_StartAndShutdown(t *testing.T) { listener, address := setupListener(t) - defer listener.Shutdown() require.NotEqual(t, address, "", "Start() should not return an empty address") require.True(t, strings.HasPrefix(address, "http://"), "Address should start with http://") require.NotNil(t, listener.httpServer, "httpServer should not be nil") @@ -190,6 +189,7 @@ func TestListener_StartAndShutdown(t *testing.T) { require.NoError(t, resp.Body.Close()) } + listener.Shutdown() require.Nil(t, listener.httpServer, "httpServer should be nil after Shutdown()") } From c40b0b23f72e993aee17f2ee38589fb38228ad97 Mon Sep 17 00:00:00 2001 From: Lukas Hering Date: Wed, 17 Dec 2025 18:21:37 -0500 Subject: [PATCH 11/12] fix receiver test errors --- collector/receiver/telemetryapireceiver/receiver_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/collector/receiver/telemetryapireceiver/receiver_test.go b/collector/receiver/telemetryapireceiver/receiver_test.go index e4fde5a66f..30944d5fbc 100644 --- a/collector/receiver/telemetryapireceiver/receiver_test.go +++ b/collector/receiver/telemetryapireceiver/receiver_test.go @@ -604,7 +604,9 @@ func TestCreateLogs(t *testing.T) { for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { r, err := newTelemetryAPIReceiver( - &Config{}, + &Config{ + LogReport: true, + }, receivertest.NewNopSettings(Type), ) require.NoError(t, err) From 0364b37637c9090802e23efc8efc8c6288751211 Mon Sep 17 00:00:00 2001 From: Lukas Hering Date: Thu, 18 Dec 2025 13:33:25 -0500 Subject: [PATCH 12/12] add explicit case checks for float64 types --- .../receiver/telemetryapireceiver/receiver.go | 24 ++++++++++++------- .../telemetryapireceiver/receiver_test.go | 11 +++++++++ 2 files changed, 27 insertions(+), 8 deletions(-) diff --git a/collector/receiver/telemetryapireceiver/receiver.go b/collector/receiver/telemetryapireceiver/receiver.go index be49616bd2..25670f81fd 100644 --- a/collector/receiver/telemetryapireceiver/receiver.go +++ b/collector/receiver/telemetryapireceiver/receiver.go @@ -55,7 +55,7 @@ const ( platformRestoreReportLogFmt = "RESTORE_REPORT Status: %s Duration: %.2f ms" platformTelemetrySubscriptionLogFmt = "TELEMETRY: %s Subscribed Types: %v" platformExtensionLogFmt = "EXTENSION Name: %s State: %s Events: %v" - platformLogsDroppedLogFmt = "LOGS_DROPPED DroppedRecords: %d DroppedBytes: %d Reason: %s" + platformLogsDroppedLogFmt = "LOGS_DROPPED DroppedRecords: %.0f DroppedBytes: %.0f Reason: %s" ) type telemetryAPIReceiver struct { @@ -314,10 +314,11 @@ func createPlatformMessage(requestId string, functionVersion string, eventType s phase, _ := record["phase"].(string) status, _ := record["status"].(string) var durationMs float64 + durationOk := false if metrics, ok := record["metrics"].(map[string]interface{}); ok { - durationMs, _ = metrics["durationMs"].(float64) + durationMs, durationOk = metrics["durationMs"].(float64) } - if initType != "" || phase != "" || status != "" || durationMs != 0 { + if initType != "" || phase != "" || status != "" || durationOk { return fmt.Sprintf(platformInitReportLogFmt, initType, phase, status, durationMs) } case string(telemetryapi.PlatformRestoreStart): @@ -334,10 +335,11 @@ func createPlatformMessage(requestId string, functionVersion string, eventType s case string(telemetryapi.PlatformRestoreReport): status, _ := record["status"].(string) var durationMs float64 + durationOk := false if metrics, ok := record["metrics"].(map[string]interface{}); ok { - durationMs, _ = metrics["durationMs"].(float64) + durationMs, durationOk = metrics["durationMs"].(float64) } - if status != "" && durationMs != 0 { + if status != "" && durationOk { return fmt.Sprintf(platformRestoreReportLogFmt, status, durationMs) } case string(telemetryapi.PlatformTelemetrySubscription): @@ -354,11 +356,17 @@ func createPlatformMessage(requestId string, functionVersion string, eventType s return fmt.Sprintf(platformExtensionLogFmt, name, state, events) } case string(telemetryapi.PlatformLogsDropped): - droppedRecords, _ := record["droppedRecords"].(float64) - droppedBytes, _ := record["droppedBytes"].(float64) + droppedRecords, ok := record["droppedRecords"].(float64) + if !ok { + return "" + } + droppedBytes, ok := record["droppedBytes"].(float64) + if !ok { + return "" + } reason, _ := record["reason"].(string) if reason != "" { - return fmt.Sprintf(platformLogsDroppedLogFmt, int(droppedRecords), int(droppedBytes), reason) + return fmt.Sprintf(platformLogsDroppedLogFmt, droppedRecords, droppedBytes, reason) } } return "" diff --git a/collector/receiver/telemetryapireceiver/receiver_test.go b/collector/receiver/telemetryapireceiver/receiver_test.go index 30944d5fbc..b3beafd2bd 100644 --- a/collector/receiver/telemetryapireceiver/receiver_test.go +++ b/collector/receiver/telemetryapireceiver/receiver_test.go @@ -1172,6 +1172,17 @@ func TestCreatePlatformMessage(t *testing.T) { "durationMs": 0.0, }, }, + expected: "RESTORE_REPORT Status: success Duration: 0.00 ms", + }, + { + desc: "platform.restoreReport with no duration", + requestId: "", + functionVersion: "", + eventType: "platform.restoreReport", + record: map[string]interface{}{ + "status": "success", + "metrics": map[string]interface{}{}, + }, expected: "", }, {