From f62f93a8a16192011e02dbba83e18a9a0bdf8bd6 Mon Sep 17 00:00:00 2001 From: JJ Botha Date: Thu, 8 Jan 2026 15:33:39 -0500 Subject: [PATCH 1/2] Fix DSM queue names with Kafka Connect IBM MQ connectors When using Kafka Connect with IBM MQ connectors, DSM was reporting incorrect queue names with schema-derived suffixes like _messagebody_0. This occurred because Kafka Connect schema converters add index suffixes to field names for union/optional types. This fix sanitizes queue/topic names to remove these suffixes: - _messagebody_N - _text_N - _bytes_N - _map_N - _value_N Fixes Zendesk ticket #2429181 Co-Authored-By: Claude Opus 4.5 --- .../instrumentation/jms/JMSDecorator.java | 23 ++++- .../src/test/groovy/JMSDecoratorTest.groovy | 99 +++++++++++++++++++ 2 files changed, 121 insertions(+), 1 deletion(-) create mode 100644 dd-java-agent/instrumentation/jms/javax-jms-1.1/src/test/groovy/JMSDecoratorTest.groovy diff --git a/dd-java-agent/instrumentation/jms/javax-jms-1.1/src/main/java/datadog/trace/instrumentation/jms/JMSDecorator.java b/dd-java-agent/instrumentation/jms/javax-jms-1.1/src/main/java/datadog/trace/instrumentation/jms/JMSDecorator.java index 9655dc7c6ea..8f8744a45fa 100644 --- a/dd-java-agent/instrumentation/jms/javax-jms-1.1/src/main/java/datadog/trace/instrumentation/jms/JMSDecorator.java +++ b/dd-java-agent/instrumentation/jms/javax-jms-1.1/src/main/java/datadog/trace/instrumentation/jms/JMSDecorator.java @@ -194,6 +194,23 @@ public void onTimeInQueue(AgentSpan span, CharSequence resourceName, String serv private static final String TIBCO_TMP_PREFIX = "$TMP$"; + // Pattern to match Kafka Connect schema-derived suffixes like _messagebody_0, _text_0, _bytes_0 + // These suffixes are added by Kafka Connect converters when handling union/optional fields + private static final java.util.regex.Pattern KAFKA_CONNECT_SCHEMA_SUFFIX_PATTERN = + java.util.regex.Pattern.compile("_(?:messagebody|text|bytes|map|value)_\\d+$", java.util.regex.Pattern.CASE_INSENSITIVE); + + /** + * Sanitizes destination names to remove Kafka Connect schema-derived suffixes. + * When Kafka Connect's IBM MQ connectors are used with schema converters (Protobuf/JSON Schema), + * union or optional fields may get index suffixes like _messagebody_0 appended to the queue name. + */ + private static String sanitizeDestinationName(String name) { + if (name == null) { + return null; + } + return KAFKA_CONNECT_SCHEMA_SUFFIX_PATTERN.matcher(name).replaceFirst(""); + } + public CharSequence toResourceName(String destinationName, boolean isQueue) { if (null == destinationName) { return isQueue ? queueTempResourceName : topicTempResourceName; @@ -229,7 +246,11 @@ public String getDestinationName(Destination destination) { } catch (Exception e) { log.debug("Unable to get jms destination name", e); } - return null != name && !name.startsWith(TIBCO_TMP_PREFIX) ? name : null; + if (null != name && !name.startsWith(TIBCO_TMP_PREFIX)) { + // Sanitize Kafka Connect schema-derived suffixes from queue/topic names + return sanitizeDestinationName(name); + } + return null; } public boolean isQueue(Destination destination) { diff --git a/dd-java-agent/instrumentation/jms/javax-jms-1.1/src/test/groovy/JMSDecoratorTest.groovy b/dd-java-agent/instrumentation/jms/javax-jms-1.1/src/test/groovy/JMSDecoratorTest.groovy new file mode 100644 index 00000000000..1c8f2dc0bdd --- /dev/null +++ b/dd-java-agent/instrumentation/jms/javax-jms-1.1/src/test/groovy/JMSDecoratorTest.groovy @@ -0,0 +1,99 @@ +import datadog.trace.instrumentation.jms.JMSDecorator +import spock.lang.Specification + +import javax.jms.Queue +import javax.jms.Topic + +class JMSDecoratorTest extends Specification { + + def "test getDestinationName sanitizes Kafka Connect schema suffixes"() { + given: + def decorator = JMSDecorator.CONSUMER_DECORATE + + when: + def queue = Mock(Queue) { + getQueueName() >> rawQueueName + } + def result = decorator.getDestinationName(queue) + + then: + result == expectedName + + where: + rawQueueName | expectedName + // Customer reported issue: queue name with _messagebody_0 suffix from Kafka Connect IBM MQ connector + // See Zendesk ticket #2429181 + "trainmgt.dispatch.trnsheet.p30.v1.pub_messagebody_0" | "trainmgt.dispatch.trnsheet.p30.v1.pub" + + // Normal queue names should pass through unchanged (like customer's working pure Java apps) + "ee.wo.aei.delmove.cs" | "ee.wo.aei.delmove.cs" + "myqueue" | "myqueue" + "my.queue.name" | "my.queue.name" + + // Other Kafka Connect schema-derived suffixes should also be stripped + "myqueue_messagebody_0" | "myqueue" + "myqueue_text_0" | "myqueue" + "myqueue_bytes_0" | "myqueue" + "myqueue_map_0" | "myqueue" + "myqueue_value_0" | "myqueue" + "myqueue_MESSAGEBODY_0" | "myqueue" // case insensitive + "myqueue_MessageBody_0" | "myqueue" // case insensitive + + // Multiple digit indices + "myqueue_messagebody_10" | "myqueue" + "myqueue_messagebody_123" | "myqueue" + + // Names that look similar but shouldn't be stripped + "myqueue_messagebody" | "myqueue_messagebody" // no index + "messagebody_0_queue" | "messagebody_0_queue" // not at end + "myqueue_othersuffix_0" | "myqueue_othersuffix_0" // unknown suffix + } + + def "test getDestinationName with topic sanitizes Kafka Connect schema suffixes"() { + given: + def decorator = JMSDecorator.CONSUMER_DECORATE + + when: + def topic = Mock(Topic) { + getTopicName() >> rawTopicName + } + def result = decorator.getDestinationName(topic) + + then: + result == expectedName + + where: + rawTopicName | expectedName + "mytopic" | "mytopic" + "mytopic_messagebody_0" | "mytopic" + "mytopic_text_0" | "mytopic" + } + + def "test getDestinationName returns null for null queue name"() { + given: + def decorator = JMSDecorator.CONSUMER_DECORATE + + when: + def queue = Mock(Queue) { + getQueueName() >> null + } + def result = decorator.getDestinationName(queue) + + then: + result == null + } + + def "test getDestinationName returns null for TIBCO temp prefix"() { + given: + def decorator = JMSDecorator.CONSUMER_DECORATE + + when: + def queue = Mock(Queue) { + getQueueName() >> '$TMP$myqueue' + } + def result = decorator.getDestinationName(queue) + + then: + result == null + } +} From 5d4bd1080522ca89b707f5c926c784689e4a5933 Mon Sep 17 00:00:00 2001 From: JJ Botha Date: Fri, 9 Jan 2026 16:03:05 -0500 Subject: [PATCH 2/2] Refactor: Replace regex Pattern with string-based approach for performance Address review feedback to avoid Pattern/Matcher usage for performance reasons. Use lastIndexOf() and regionMatches() instead of regex to strip Kafka Connect schema-derived suffixes from queue names. Co-Authored-By: Claude Opus 4.5 --- .../instrumentation/jms/JMSDecorator.java | 60 +++++++++++++++---- 1 file changed, 49 insertions(+), 11 deletions(-) diff --git a/dd-java-agent/instrumentation/jms/javax-jms-1.1/src/main/java/datadog/trace/instrumentation/jms/JMSDecorator.java b/dd-java-agent/instrumentation/jms/javax-jms-1.1/src/main/java/datadog/trace/instrumentation/jms/JMSDecorator.java index 8f8744a45fa..298ee38d7cd 100644 --- a/dd-java-agent/instrumentation/jms/javax-jms-1.1/src/main/java/datadog/trace/instrumentation/jms/JMSDecorator.java +++ b/dd-java-agent/instrumentation/jms/javax-jms-1.1/src/main/java/datadog/trace/instrumentation/jms/JMSDecorator.java @@ -194,21 +194,59 @@ public void onTimeInQueue(AgentSpan span, CharSequence resourceName, String serv private static final String TIBCO_TMP_PREFIX = "$TMP$"; - // Pattern to match Kafka Connect schema-derived suffixes like _messagebody_0, _text_0, _bytes_0 - // These suffixes are added by Kafka Connect converters when handling union/optional fields - private static final java.util.regex.Pattern KAFKA_CONNECT_SCHEMA_SUFFIX_PATTERN = - java.util.regex.Pattern.compile("_(?:messagebody|text|bytes|map|value)_\\d+$", java.util.regex.Pattern.CASE_INSENSITIVE); - /** - * Sanitizes destination names to remove Kafka Connect schema-derived suffixes. - * When Kafka Connect's IBM MQ connectors are used with schema converters (Protobuf/JSON Schema), - * union or optional fields may get index suffixes like _messagebody_0 appended to the queue name. + * Sanitizes destination names to remove Kafka Connect schema-derived suffixes. When Kafka + * Connect's IBM MQ connectors are used with schema converters (Protobuf/JSON Schema), union or + * optional fields may get index suffixes like _messagebody_0 appended to the queue name. */ private static String sanitizeDestinationName(String name) { - if (name == null) { - return null; + if (name == null || name.isEmpty()) { + return name; + } + + int len = name.length(); + + // Check if name ends with digits (the schema index suffix) + if (!Character.isDigit(name.charAt(len - 1))) { + return name; + } + + // Find the underscore before the trailing digits + int underscoreBeforeDigits = name.lastIndexOf('_'); + if (underscoreBeforeDigits <= 0) { + return name; + } + + // Verify all characters after the underscore are digits + for (int i = underscoreBeforeDigits + 1; i < len; i++) { + if (!Character.isDigit(name.charAt(i))) { + return name; + } } - return KAFKA_CONNECT_SCHEMA_SUFFIX_PATTERN.matcher(name).replaceFirst(""); + + // Find the underscore before the suffix word + int underscoreBeforeSuffix = name.lastIndexOf('_', underscoreBeforeDigits - 1); + if (underscoreBeforeSuffix < 0) { + return name; + } + + // Check if the suffix word is one of our known Kafka Connect schema suffixes (case insensitive) + int suffixStart = underscoreBeforeSuffix + 1; + int suffixLen = underscoreBeforeDigits - suffixStart; + + if (isKnownKafkaConnectSuffix(name, suffixStart, suffixLen)) { + return name.substring(0, underscoreBeforeSuffix); + } + + return name; + } + + private static boolean isKnownKafkaConnectSuffix(String name, int start, int len) { + return (len == 11 && name.regionMatches(true, start, "messagebody", 0, 11)) + || (len == 4 && name.regionMatches(true, start, "text", 0, 4)) + || (len == 5 && name.regionMatches(true, start, "bytes", 0, 5)) + || (len == 3 && name.regionMatches(true, start, "map", 0, 3)) + || (len == 5 && name.regionMatches(true, start, "value", 0, 5)); } public CharSequence toResourceName(String destinationName, boolean isQueue) {