2727import io .confluent .kafka .serializers .KafkaAvroDeserializer ;
2828import java .io .InputStream ;
2929import java .io .OutputStream ;
30+ import java .lang .reflect .Constructor ;
3031import java .lang .reflect .Method ;
3132import java .util .ArrayList ;
3233import java .util .Arrays ;
9293import org .apache .beam .sdk .transforms .splittabledofn .WatermarkEstimators .Manual ;
9394import org .apache .beam .sdk .transforms .splittabledofn .WatermarkEstimators .MonotonicallyIncreasing ;
9495import org .apache .beam .sdk .transforms .splittabledofn .WatermarkEstimators .WallTime ;
96+ import org .apache .beam .sdk .util .InstanceBuilder ;
9597import org .apache .beam .sdk .transforms .windowing .GlobalWindow ;
9698import org .apache .beam .sdk .util .Preconditions ;
9799import org .apache .beam .sdk .util .construction .PTransformMatchers ;
@@ -655,14 +657,6 @@ public static <K, V> WriteRecords<K, V> writeRecords() {
655657
656658 ///////////////////////// Read Support \\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\
657659
658- /**
659- * Default number of keys to redistribute Kafka inputs into.
660- *
661- * <p>This value is used when {@link Read#withRedistribute()} is used without {@link
662- * Read#withRedistributeNumKeys(int redistributeNumKeys)}.
663- */
664- private static final int DEFAULT_REDISTRIBUTE_NUM_KEYS = 32768 ;
665-
666660 /**
667661 * A {@link PTransform} to read from Kafka topics. See {@link KafkaIO} for more information on
668662 * usage and configuration.
@@ -921,6 +915,25 @@ static <K, V> void setupExternalBuilder(
921915 builder .setAllowDuplicates (false );
922916 builder .setOffsetDeduplication (false );
923917 }
918+
919+ if (config .consumerFactoryFnClass != null ) {
920+ if (config .consumerFactoryFnClass .contains ("KerberosConsumerFactoryFn" )) {
921+ try {
922+ if (!config .consumerFactoryFnClass .contains ("krb5Location" )) {
923+ throw new IllegalArgumentException ("The KerberosConsumerFactoryFn requires a location for the krb5.conf file. " +
924+ "Please provide either a GCS location or Google Secret Manager location for this file." );
925+ }
926+ builder .setConsumerFactoryFn (InstanceBuilder .ofType (new TypeDescriptor <SerializableFunction <Map <String , Object >, Consumer <byte [], byte []>>>() {})
927+ .fromClassName (config .consumerFactoryFnClass )
928+ .withArg (
929+ String .class ,
930+ config .consumerFactoryFnParams .get ("krb5Location" ))
931+ .build ());
932+ } catch (Exception e ) {
933+ throw new RuntimeException ("Unable to construct FactoryFn " + config .consumerFactoryFnClass + ": " + e .getMessage (), e );
934+ }
935+ }
936+ }
924937 }
925938
926939 private static <T > Coder <T > resolveCoder (Class <Deserializer <T >> deserializer ) {
@@ -990,6 +1003,8 @@ public static class Configuration {
9901003 private Boolean allowDuplicates ;
9911004 private Boolean offsetDeduplication ;
9921005 private Long dynamicReadPollIntervalSeconds ;
1006+ private String consumerFactoryFnClass ;
1007+ private Map <String , String > consumerFactoryFnParams ;
9931008
9941009 public void setConsumerConfig (Map <String , String > consumerConfig ) {
9951010 this .consumerConfig = consumerConfig ;
@@ -1054,6 +1069,14 @@ public void setOffsetDeduplication(Boolean offsetDeduplication) {
10541069 public void setDynamicReadPollIntervalSeconds (Long dynamicReadPollIntervalSeconds ) {
10551070 this .dynamicReadPollIntervalSeconds = dynamicReadPollIntervalSeconds ;
10561071 }
1072+
1073+ public void setConsumerFactoryFnClass (String consumerFactoryFnClass ) {
1074+ this .consumerFactoryFnClass = consumerFactoryFnClass ;
1075+ }
1076+
1077+ public void setConsumerFactoryFnParams (Map <String , String > consumerFactoryFnParams ) {
1078+ this .consumerFactoryFnParams = consumerFactoryFnParams ;
1079+ }
10571080 }
10581081 }
10591082
@@ -1103,60 +1126,19 @@ public Read<K, V> withTopicPartitions(List<TopicPartition> topicPartitions) {
11031126
11041127 /**
11051128 * Sets redistribute transform that hints to the runner to try to redistribute the work evenly.
1106- *
1107- * @return an updated {@link Read} transform.
11081129 */
11091130 public Read <K , V > withRedistribute () {
1110- Builder <K , V > builder = toBuilder ().setRedistributed (true );
1111- if (getRedistributeNumKeys () == 0 ) {
1112- builder = builder .setRedistributeNumKeys (DEFAULT_REDISTRIBUTE_NUM_KEYS );
1113- }
1114- return builder .build ();
1131+ return toBuilder ().setRedistributed (true ).build ();
11151132 }
11161133
1117- /**
1118- * Hints to the runner that it can relax exactly-once processing guarantees, allowing duplicates
1119- * in at-least-once processing mode of Kafka inputs.
1120- *
1121- * <p>Must be used with {@link KafkaIO#withRedistribute()}.
1122- *
1123- * <p>Not compatible with {@link KafkaIO#withOffsetDeduplication()}.
1124- *
1125- * @param allowDuplicates specifies whether to allow duplicates.
1126- * @return an updated {@link Read} transform.
1127- */
11281134 public Read <K , V > withAllowDuplicates (Boolean allowDuplicates ) {
11291135 return toBuilder ().setAllowDuplicates (allowDuplicates ).build ();
11301136 }
11311137
1132- /**
1133- * Redistributes Kafka messages into a distinct number of keys for processing in subsequent
1134- * steps.
1135- *
1136- * <p>If unset, defaults to {@link KafkaIO#DEFAULT_REDISTRIBUTE_NUM_KEYS}.
1137- *
1138- * <p>Use zero to disable bucketing into a distinct number of keys.
1139- *
1140- * <p>Must be used with {@link Read#withRedistribute()}.
1141- *
1142- * @param redistributeNumKeys specifies the total number of keys for redistributing inputs.
1143- * @return an updated {@link Read} transform.
1144- */
11451138 public Read <K , V > withRedistributeNumKeys (int redistributeNumKeys ) {
11461139 return toBuilder ().setRedistributeNumKeys (redistributeNumKeys ).build ();
11471140 }
11481141
1149- /**
1150- * Hints to the runner to optimize the redistribute by minimizing the amount of data required
1151- * for persistence as part of the redistribute operation.
1152- *
1153- * <p>Must be used with {@link KafkaIO#withRedistribute()}.
1154- *
1155- * <p>Not compatible with {@link KafkaIO#withAllowDuplicates()}.
1156- *
1157- * @param offsetDeduplication specifies whether to enable offset-based deduplication.
1158- * @return an updated {@link Read} transform.
1159- */
11601142 public Read <K , V > withOffsetDeduplication (Boolean offsetDeduplication ) {
11611143 return toBuilder ().setOffsetDeduplication (offsetDeduplication ).build ();
11621144 }
@@ -1634,8 +1616,6 @@ public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
16341616 final KafkaIOReadImplementationCompatibilityResult compatibility =
16351617 KafkaIOReadImplementationCompatibility .getCompatibility (this );
16361618
1637- Read <K , V > kafkaRead = deduplicateTopics (this );
1638-
16391619 // For a number of cases, we prefer using the UnboundedSource Kafka over the new SDF-based
16401620 // Kafka source, for example,
16411621 // * Experiments 'beam_fn_api_use_deprecated_read' and use_deprecated_read will result in
@@ -1652,9 +1632,9 @@ public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
16521632 || compatibility .supportsOnly (KafkaIOReadImplementation .LEGACY )
16531633 || (compatibility .supports (KafkaIOReadImplementation .LEGACY )
16541634 && runnerPrefersLegacyRead (input .getPipeline ().getOptions ()))) {
1655- return input .apply (new ReadFromKafkaViaUnbounded <>(kafkaRead , keyCoder , valueCoder ));
1635+ return input .apply (new ReadFromKafkaViaUnbounded <>(this , keyCoder , valueCoder ));
16561636 }
1657- return input .apply (new ReadFromKafkaViaSDF <>(kafkaRead , keyCoder , valueCoder ));
1637+ return input .apply (new ReadFromKafkaViaSDF <>(this , keyCoder , valueCoder ));
16581638 }
16591639
16601640 private void checkRedistributeConfiguration () {
@@ -1670,14 +1650,10 @@ private void checkRedistributeConfiguration() {
16701650 isRedistributed (),
16711651 "withRedistributeNumKeys is ignored if withRedistribute() is not enabled on the transform." );
16721652 }
1673- if (getOffsetDeduplication () != null && getOffsetDeduplication () && isRedistributed () ) {
1653+ if (getOffsetDeduplication () != null && getOffsetDeduplication ()) {
16741654 checkState (
1675- !isAllowDuplicates (),
1676- "withOffsetDeduplication and withRedistribute can only be used when withAllowDuplicates is set to false." );
1677- }
1678- if (getOffsetDeduplication () != null && getOffsetDeduplication () && !isRedistributed ()) {
1679- LOG .warn (
1680- "Offsets used for deduplication are available in WindowedValue's metadata. Combining, aggregating, mutating them may risk with data loss." );
1655+ isRedistributed () && !isAllowDuplicates (),
1656+ "withOffsetDeduplication should only be used with withRedistribute and withAllowDuplicates(false)." );
16811657 }
16821658 }
16831659
@@ -1705,29 +1681,6 @@ private void warnAboutUnsafeConfigurations(PBegin input) {
17051681 }
17061682 }
17071683
1708- private Read <K , V > deduplicateTopics (Read <K , V > kafkaRead ) {
1709- final List <String > topics = getTopics ();
1710- if (topics != null && !topics .isEmpty ()) {
1711- final List <String > distinctTopics = topics .stream ().distinct ().collect (Collectors .toList ());
1712- if (topics .size () == distinctTopics .size ()) {
1713- return kafkaRead ;
1714- }
1715- return kafkaRead .toBuilder ().setTopics (distinctTopics ).build ();
1716- }
1717-
1718- final List <TopicPartition > topicPartitions = getTopicPartitions ();
1719- if (topicPartitions != null && !topicPartitions .isEmpty ()) {
1720- final List <TopicPartition > distinctTopicPartitions =
1721- topicPartitions .stream ().distinct ().collect (Collectors .toList ());
1722- if (topicPartitions .size () == distinctTopicPartitions .size ()) {
1723- return kafkaRead ;
1724- }
1725- return kafkaRead .toBuilder ().setTopicPartitions (distinctTopicPartitions ).build ();
1726- }
1727-
1728- return kafkaRead ;
1729- }
1730-
17311684 // This class is designed to mimic the Flink pipeline options, so we can check for the
17321685 // checkpointingInterval property, but without needing to depend on the Flink runner
17331686 // Do not use this
@@ -1845,18 +1798,13 @@ public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
18451798 .withMaxReadTime (kafkaRead .getMaxReadTime ())
18461799 .withMaxNumRecords (kafkaRead .getMaxNumRecords ());
18471800 }
1848- PCollection <KafkaRecord <K , V >> output = input .getPipeline ().apply (transform );
1849- if (kafkaRead .getOffsetDeduplication () != null && kafkaRead .getOffsetDeduplication ()) {
1850- output =
1851- output .apply (
1852- "Insert Offset for offset deduplication" ,
1853- ParDo .of (new OffsetDeduplicationIdExtractor <>()));
1854- }
1801+
18551802 if (kafkaRead .isRedistributed ()) {
18561803 if (kafkaRead .isCommitOffsetsInFinalizeEnabled () && kafkaRead .isAllowDuplicates ()) {
18571804 LOG .warn (
18581805 "Offsets committed due to usage of commitOffsetsInFinalize() and may not capture all work processed due to use of withRedistribute() with duplicates enabled" );
18591806 }
1807+ PCollection <KafkaRecord <K , V >> output = input .getPipeline ().apply (transform );
18601808
18611809 if (kafkaRead .getRedistributeNumKeys () == 0 ) {
18621810 return output .apply (
@@ -1871,7 +1819,7 @@ public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
18711819 .withNumBuckets ((int ) kafkaRead .getRedistributeNumKeys ()));
18721820 }
18731821 }
1874- return output ;
1822+ return input . getPipeline (). apply ( transform ) ;
18751823 }
18761824 }
18771825
@@ -1944,7 +1892,6 @@ public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
19441892 .as (StreamingOptions .class )
19451893 .getUpdateCompatibilityVersion ();
19461894 if (requestedVersionString != null
1947- && !requestedVersionString .isEmpty ()
19481895 && TransformUpgrader .compareVersions (requestedVersionString , "2.66.0" ) < 0 ) {
19491896 // Use discouraged Impulse for backwards compatibility with previous released versions.
19501897 output =
@@ -1980,29 +1927,6 @@ public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
19801927 }
19811928 }
19821929
1983- static class OffsetDeduplicationIdExtractor <K , V >
1984- extends DoFn <KafkaRecord <K , V >, KafkaRecord <K , V >> {
1985-
1986- @ ProcessElement
1987- public void processElement (ProcessContext pc ) {
1988- KafkaRecord <K , V > element = pc .element ();
1989- Long offset = null ;
1990- String uniqueId = null ;
1991- if (element != null ) {
1992- offset = element .getOffset ();
1993- uniqueId =
1994- (String .format ("%s-%d-%d" , element .getTopic (), element .getPartition (), offset ));
1995- }
1996- pc .outputWindowedValue (
1997- element ,
1998- pc .timestamp (),
1999- Lists .newArrayList (GlobalWindow .INSTANCE ),
2000- pc .pane (),
2001- uniqueId ,
2002- offset );
2003- }
2004- }
2005-
20061930 /**
20071931 * A DoFn which generates {@link KafkaSourceDescriptor} based on the configuration of {@link
20081932 * Read}.
@@ -2680,30 +2604,13 @@ public ReadSourceDescriptors<K, V> withProcessingTime() {
26802604
26812605 /** Enable Redistribute. */
26822606 public ReadSourceDescriptors <K , V > withRedistribute () {
2683- Builder <K , V > builder = toBuilder ().setRedistribute (true );
2684- if (getRedistributeNumKeys () == 0 ) {
2685- builder = builder .setRedistributeNumKeys (DEFAULT_REDISTRIBUTE_NUM_KEYS );
2686- }
2687- return builder .build ();
2607+ return toBuilder ().setRedistribute (true ).build ();
26882608 }
26892609
26902610 public ReadSourceDescriptors <K , V > withAllowDuplicates () {
26912611 return toBuilder ().setAllowDuplicates (true ).build ();
26922612 }
26932613
2694- /**
2695- * Redistributes Kafka messages into a distinct number of keys for processing in subsequent
2696- * steps.
2697- *
2698- * <p>If unset, defaults to {@link KafkaIO#DEFAULT_REDISTRIBUTE_NUM_KEYS}.
2699- *
2700- * <p>Use zero to disable bucketing into a distinct number of keys.
2701- *
2702- * <p>Must be used with {@link ReadSourceDescriptors#withRedistribute()}.
2703- *
2704- * @param redistributeNumKeys specifies the total number of keys for redistributing inputs.
2705- * @return an updated {@link Read} transform.
2706- */
27072614 public ReadSourceDescriptors <K , V > withRedistributeNumKeys (int redistributeNumKeys ) {
27082615 return toBuilder ().setRedistributeNumKeys (redistributeNumKeys ).build ();
27092616 }
@@ -2957,7 +2864,6 @@ public PCollection<KafkaRecord<K, V>> expand(PCollection<KafkaSourceDescriptor>
29572864 .as (StreamingOptions .class )
29582865 .getUpdateCompatibilityVersion ();
29592866 if (requestedVersionString != null
2960- && !requestedVersionString .isEmpty ()
29612867 && TransformUpgrader .compareVersions (requestedVersionString , "2.60.0" ) < 0 ) {
29622868 // Redistribute is not allowed with commits prior to 2.59.0, since there is a Reshuffle
29632869 // prior to the redistribute. The reshuffle will occur before commits are offsetted and
0 commit comments