diff --git a/dd-java-agent/instrumentation/jms/javax-jms-1.1/src/main/java/datadog/trace/instrumentation/jms/JMSDecorator.java b/dd-java-agent/instrumentation/jms/javax-jms-1.1/src/main/java/datadog/trace/instrumentation/jms/JMSDecorator.java index 9655dc7c6ea..298ee38d7cd 100644 --- a/dd-java-agent/instrumentation/jms/javax-jms-1.1/src/main/java/datadog/trace/instrumentation/jms/JMSDecorator.java +++ b/dd-java-agent/instrumentation/jms/javax-jms-1.1/src/main/java/datadog/trace/instrumentation/jms/JMSDecorator.java @@ -194,6 +194,61 @@ public void onTimeInQueue(AgentSpan span, CharSequence resourceName, String serv private static final String TIBCO_TMP_PREFIX = "$TMP$"; + /** + * Sanitizes destination names to remove Kafka Connect schema-derived suffixes. When Kafka + * Connect's IBM MQ connectors are used with schema converters (Protobuf/JSON Schema), union or + * optional fields may get index suffixes like _messagebody_0 appended to the queue name. + */ + private static String sanitizeDestinationName(String name) { + if (name == null || name.isEmpty()) { + return name; + } + + int len = name.length(); + + // Check if name ends with digits (the schema index suffix) + if (!Character.isDigit(name.charAt(len - 1))) { + return name; + } + + // Find the underscore before the trailing digits + int underscoreBeforeDigits = name.lastIndexOf('_'); + if (underscoreBeforeDigits <= 0) { + return name; + } + + // Verify all characters after the underscore are digits + for (int i = underscoreBeforeDigits + 1; i < len; i++) { + if (!Character.isDigit(name.charAt(i))) { + return name; + } + } + + // Find the underscore before the suffix word + int underscoreBeforeSuffix = name.lastIndexOf('_', underscoreBeforeDigits - 1); + if (underscoreBeforeSuffix < 0) { + return name; + } + + // Check if the suffix word is one of our known Kafka Connect schema suffixes (case insensitive) + int suffixStart = underscoreBeforeSuffix + 1; + int suffixLen = underscoreBeforeDigits - suffixStart; + + if (isKnownKafkaConnectSuffix(name, suffixStart, suffixLen)) { + return name.substring(0, underscoreBeforeSuffix); + } + + return name; + } + + private static boolean isKnownKafkaConnectSuffix(String name, int start, int len) { + return (len == 11 && name.regionMatches(true, start, "messagebody", 0, 11)) + || (len == 4 && name.regionMatches(true, start, "text", 0, 4)) + || (len == 5 && name.regionMatches(true, start, "bytes", 0, 5)) + || (len == 3 && name.regionMatches(true, start, "map", 0, 3)) + || (len == 5 && name.regionMatches(true, start, "value", 0, 5)); + } + public CharSequence toResourceName(String destinationName, boolean isQueue) { if (null == destinationName) { return isQueue ? queueTempResourceName : topicTempResourceName; @@ -229,7 +284,11 @@ public String getDestinationName(Destination destination) { } catch (Exception e) { log.debug("Unable to get jms destination name", e); } - return null != name && !name.startsWith(TIBCO_TMP_PREFIX) ? name : null; + if (null != name && !name.startsWith(TIBCO_TMP_PREFIX)) { + // Sanitize Kafka Connect schema-derived suffixes from queue/topic names + return sanitizeDestinationName(name); + } + return null; } public boolean isQueue(Destination destination) { diff --git a/dd-java-agent/instrumentation/jms/javax-jms-1.1/src/test/groovy/JMSDecoratorTest.groovy b/dd-java-agent/instrumentation/jms/javax-jms-1.1/src/test/groovy/JMSDecoratorTest.groovy new file mode 100644 index 00000000000..1c8f2dc0bdd --- /dev/null +++ b/dd-java-agent/instrumentation/jms/javax-jms-1.1/src/test/groovy/JMSDecoratorTest.groovy @@ -0,0 +1,99 @@ +import datadog.trace.instrumentation.jms.JMSDecorator +import spock.lang.Specification + +import javax.jms.Queue +import javax.jms.Topic + +class JMSDecoratorTest extends Specification { + + def "test getDestinationName sanitizes Kafka Connect schema suffixes"() { + given: + def decorator = JMSDecorator.CONSUMER_DECORATE + + when: + def queue = Mock(Queue) { + getQueueName() >> rawQueueName + } + def result = decorator.getDestinationName(queue) + + then: + result == expectedName + + where: + rawQueueName | expectedName + // Customer reported issue: queue name with _messagebody_0 suffix from Kafka Connect IBM MQ connector + // See Zendesk ticket #2429181 + "trainmgt.dispatch.trnsheet.p30.v1.pub_messagebody_0" | "trainmgt.dispatch.trnsheet.p30.v1.pub" + + // Normal queue names should pass through unchanged (like customer's working pure Java apps) + "ee.wo.aei.delmove.cs" | "ee.wo.aei.delmove.cs" + "myqueue" | "myqueue" + "my.queue.name" | "my.queue.name" + + // Other Kafka Connect schema-derived suffixes should also be stripped + "myqueue_messagebody_0" | "myqueue" + "myqueue_text_0" | "myqueue" + "myqueue_bytes_0" | "myqueue" + "myqueue_map_0" | "myqueue" + "myqueue_value_0" | "myqueue" + "myqueue_MESSAGEBODY_0" | "myqueue" // case insensitive + "myqueue_MessageBody_0" | "myqueue" // case insensitive + + // Multiple digit indices + "myqueue_messagebody_10" | "myqueue" + "myqueue_messagebody_123" | "myqueue" + + // Names that look similar but shouldn't be stripped + "myqueue_messagebody" | "myqueue_messagebody" // no index + "messagebody_0_queue" | "messagebody_0_queue" // not at end + "myqueue_othersuffix_0" | "myqueue_othersuffix_0" // unknown suffix + } + + def "test getDestinationName with topic sanitizes Kafka Connect schema suffixes"() { + given: + def decorator = JMSDecorator.CONSUMER_DECORATE + + when: + def topic = Mock(Topic) { + getTopicName() >> rawTopicName + } + def result = decorator.getDestinationName(topic) + + then: + result == expectedName + + where: + rawTopicName | expectedName + "mytopic" | "mytopic" + "mytopic_messagebody_0" | "mytopic" + "mytopic_text_0" | "mytopic" + } + + def "test getDestinationName returns null for null queue name"() { + given: + def decorator = JMSDecorator.CONSUMER_DECORATE + + when: + def queue = Mock(Queue) { + getQueueName() >> null + } + def result = decorator.getDestinationName(queue) + + then: + result == null + } + + def "test getDestinationName returns null for TIBCO temp prefix"() { + given: + def decorator = JMSDecorator.CONSUMER_DECORATE + + when: + def queue = Mock(Queue) { + getQueueName() >> '$TMP$myqueue' + } + def result = decorator.getDestinationName(queue) + + then: + result == null + } +}