Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,61 @@ public void onTimeInQueue(AgentSpan span, CharSequence resourceName, String serv

private static final String TIBCO_TMP_PREFIX = "$TMP$";

/**
* Sanitizes destination names to remove Kafka Connect schema-derived suffixes. When Kafka
* Connect's IBM MQ connectors are used with schema converters (Protobuf/JSON Schema), union or
* optional fields may get index suffixes like _messagebody_0 appended to the queue name.
*/
private static String sanitizeDestinationName(String name) {
if (name == null || name.isEmpty()) {
return name;
}

int len = name.length();

// Check if name ends with digits (the schema index suffix)
if (!Character.isDigit(name.charAt(len - 1))) {
return name;
}

// Find the underscore before the trailing digits
int underscoreBeforeDigits = name.lastIndexOf('_');
if (underscoreBeforeDigits <= 0) {
return name;
}

// Verify all characters after the underscore are digits
for (int i = underscoreBeforeDigits + 1; i < len; i++) {
if (!Character.isDigit(name.charAt(i))) {
return name;
}
}

// Find the underscore before the suffix word
int underscoreBeforeSuffix = name.lastIndexOf('_', underscoreBeforeDigits - 1);
if (underscoreBeforeSuffix < 0) {
return name;
}

// Check if the suffix word is one of our known Kafka Connect schema suffixes (case insensitive)
int suffixStart = underscoreBeforeSuffix + 1;
int suffixLen = underscoreBeforeDigits - suffixStart;

if (isKnownKafkaConnectSuffix(name, suffixStart, suffixLen)) {
return name.substring(0, underscoreBeforeSuffix);
}

return name;
}

private static boolean isKnownKafkaConnectSuffix(String name, int start, int len) {
return (len == 11 && name.regionMatches(true, start, "messagebody", 0, 11))
|| (len == 4 && name.regionMatches(true, start, "text", 0, 4))
|| (len == 5 && name.regionMatches(true, start, "bytes", 0, 5))
|| (len == 3 && name.regionMatches(true, start, "map", 0, 3))
|| (len == 5 && name.regionMatches(true, start, "value", 0, 5));
}

public CharSequence toResourceName(String destinationName, boolean isQueue) {
if (null == destinationName) {
return isQueue ? queueTempResourceName : topicTempResourceName;
Expand Down Expand Up @@ -229,7 +284,11 @@ public String getDestinationName(Destination destination) {
} catch (Exception e) {
log.debug("Unable to get jms destination name", e);
}
return null != name && !name.startsWith(TIBCO_TMP_PREFIX) ? name : null;
if (null != name && !name.startsWith(TIBCO_TMP_PREFIX)) {
// Sanitize Kafka Connect schema-derived suffixes from queue/topic names
return sanitizeDestinationName(name);
}
return null;
}

public boolean isQueue(Destination destination) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import datadog.trace.instrumentation.jms.JMSDecorator
import spock.lang.Specification

import javax.jms.Queue
import javax.jms.Topic

class JMSDecoratorTest extends Specification {

def "test getDestinationName sanitizes Kafka Connect schema suffixes"() {
given:
def decorator = JMSDecorator.CONSUMER_DECORATE

when:
def queue = Mock(Queue) {
getQueueName() >> rawQueueName
}
def result = decorator.getDestinationName(queue)

then:
result == expectedName

where:
rawQueueName | expectedName
// Customer reported issue: queue name with _messagebody_0 suffix from Kafka Connect IBM MQ connector
// See Zendesk ticket #2429181
"trainmgt.dispatch.trnsheet.p30.v1.pub_messagebody_0" | "trainmgt.dispatch.trnsheet.p30.v1.pub"

// Normal queue names should pass through unchanged (like customer's working pure Java apps)
"ee.wo.aei.delmove.cs" | "ee.wo.aei.delmove.cs"
"myqueue" | "myqueue"
"my.queue.name" | "my.queue.name"

// Other Kafka Connect schema-derived suffixes should also be stripped
"myqueue_messagebody_0" | "myqueue"
"myqueue_text_0" | "myqueue"
"myqueue_bytes_0" | "myqueue"
"myqueue_map_0" | "myqueue"
"myqueue_value_0" | "myqueue"
"myqueue_MESSAGEBODY_0" | "myqueue" // case insensitive
"myqueue_MessageBody_0" | "myqueue" // case insensitive

// Multiple digit indices
"myqueue_messagebody_10" | "myqueue"
"myqueue_messagebody_123" | "myqueue"

// Names that look similar but shouldn't be stripped
"myqueue_messagebody" | "myqueue_messagebody" // no index
"messagebody_0_queue" | "messagebody_0_queue" // not at end
"myqueue_othersuffix_0" | "myqueue_othersuffix_0" // unknown suffix
}

def "test getDestinationName with topic sanitizes Kafka Connect schema suffixes"() {
given:
def decorator = JMSDecorator.CONSUMER_DECORATE

when:
def topic = Mock(Topic) {
getTopicName() >> rawTopicName
}
def result = decorator.getDestinationName(topic)

then:
result == expectedName

where:
rawTopicName | expectedName
"mytopic" | "mytopic"
"mytopic_messagebody_0" | "mytopic"
"mytopic_text_0" | "mytopic"
}

def "test getDestinationName returns null for null queue name"() {
given:
def decorator = JMSDecorator.CONSUMER_DECORATE

when:
def queue = Mock(Queue) {
getQueueName() >> null
}
def result = decorator.getDestinationName(queue)

then:
result == null
}

def "test getDestinationName returns null for TIBCO temp prefix"() {
given:
def decorator = JMSDecorator.CONSUMER_DECORATE

when:
def queue = Mock(Queue) {
getQueueName() >> '$TMP$myqueue'
}
def result = decorator.getDestinationName(queue)

then:
result == null
}
}
Loading