diff --git a/sdks/java/extensions/kafka-factories/src/main/java/org/apache/beam/sdk/extensions/kafka/factories/FileAwareFactoryFn.java b/sdks/java/extensions/kafka-factories/src/main/java/org/apache/beam/sdk/extensions/kafka/factories/FileAwareFactoryFn.java index a0f15b42382d..d1fef7736602 100644 --- a/sdks/java/extensions/kafka-factories/src/main/java/org/apache/beam/sdk/extensions/kafka/factories/FileAwareFactoryFn.java +++ b/sdks/java/extensions/kafka-factories/src/main/java/org/apache/beam/sdk/extensions/kafka/factories/FileAwareFactoryFn.java @@ -17,6 +17,23 @@ */ package org.apache.beam.sdk.extensions.kafka.factories; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ import com.google.cloud.secretmanager.v1.AccessSecretVersionResponse; import com.google.cloud.secretmanager.v1.SecretManagerServiceClient; import com.google.cloud.secretmanager.v1.SecretVersionName; @@ -215,7 +232,6 @@ private String replacePathWithLocal(String externalPath) throws IOException { throw new RuntimeException( "The provided external bucket could not be matched to a known source."); } - int prefixLength = externalBucketPrefixIndex + externalBucketPrefixIdentifier.length(); return DIRECTORY_PREFIX + "/" + factoryType + "/" + externalPath.substring(prefixLength); } diff --git a/sdks/java/extensions/sql/build.gradle b/sdks/java/extensions/sql/build.gradle index afbc87f8eeba..ee06c3a80cb9 100644 --- a/sdks/java/extensions/sql/build.gradle +++ b/sdks/java/extensions/sql/build.gradle @@ -116,6 +116,9 @@ dependencies { permitUnusedDeclared library.java.hadoop_client provided library.java.kafka_clients + implementation project(":sdks:java:extensions:kafka-factories") + permitUnusedDeclared project(":sdks:java:extensions:kafka-factories") + testImplementation library.java.vendored_calcite_1_40_0 testImplementation library.java.vendored_guava_32_1_2_jre testImplementation library.java.junit diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaCSVTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaCSVTable.java index 61c4c1eaa81c..a5f8ab8017fd 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaCSVTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaCSVTable.java @@ -23,17 +23,21 @@ import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; import org.apache.beam.sdk.io.kafka.KafkaRecord; import org.apache.beam.sdk.io.kafka.TimestampPolicyFactory; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; import org.apache.commons.csv.CSVFormat; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.producer.ProducerRecord; /** A Kafka topic that saves records as CSV format. */ @@ -41,7 +45,12 @@ public class BeamKafkaCSVTable extends BeamKafkaTable { private final CSVFormat csvFormat; public BeamKafkaCSVTable(Schema beamSchema, String bootstrapServers, List topics) { - this(beamSchema, bootstrapServers, topics, TimestampPolicyFactory.withProcessingTime()); + this( + beamSchema, + bootstrapServers, + topics, + TimestampPolicyFactory.withProcessingTime(), + /*consumerFactoryFn=*/ null); } public BeamKafkaCSVTable( @@ -49,7 +58,29 @@ public BeamKafkaCSVTable( String bootstrapServers, List topics, TimestampPolicyFactory timestampPolicyFactory) { - this(beamSchema, bootstrapServers, topics, CSVFormat.DEFAULT, timestampPolicyFactory); + this( + beamSchema, + bootstrapServers, + topics, + CSVFormat.DEFAULT, + timestampPolicyFactory, + /*consumerFactoryFn=*/ null); + } + + public BeamKafkaCSVTable( + Schema beamSchema, + String bootstrapServers, + List topics, + TimestampPolicyFactory timestampPolicyFactory, + @Nullable + SerializableFunction, Consumer> consumerFactoryFn) { + this( + beamSchema, + bootstrapServers, + topics, + CSVFormat.DEFAULT, + timestampPolicyFactory, + consumerFactoryFn); } public BeamKafkaCSVTable( @@ -57,8 +88,10 @@ public BeamKafkaCSVTable( String bootstrapServers, List topics, CSVFormat format, - TimestampPolicyFactory timestampPolicyFactory) { - super(beamSchema, bootstrapServers, topics, timestampPolicyFactory); + TimestampPolicyFactory timestampPolicyFactory, + @Nullable + SerializableFunction, Consumer> consumerFactoryFn) { + super(beamSchema, bootstrapServers, topics, timestampPolicyFactory, consumerFactoryFn); this.csvFormat = format; } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java index 8752ec1affe4..af36725dda5a 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java @@ -26,7 +26,9 @@ import java.util.Properties; import java.util.stream.Collectors; import java.util.stream.Stream; +import javax.annotation.Nullable; import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.NullableCoder; import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics; import org.apache.beam.sdk.extensions.sql.meta.SchemaBaseBeamTable; import org.apache.beam.sdk.extensions.sql.meta.provider.InvalidTableException; @@ -37,6 +39,7 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.POutput; @@ -63,6 +66,7 @@ public abstract class BeamKafkaTable extends SchemaBaseBeamTable { private TimestampPolicyFactory timestampPolicyFactory = TimestampPolicyFactory.withProcessingTime(); + private SerializableFunction, Consumer> consumerFactoryFn; private String bootstrapServers; private List topics; private List topicPartitions; @@ -77,35 +81,65 @@ protected BeamKafkaTable(Schema beamSchema) { } public BeamKafkaTable(Schema beamSchema, String bootstrapServers, List topics) { - this(beamSchema, bootstrapServers, topics, TimestampPolicyFactory.withLogAppendTime()); + this( + beamSchema, + bootstrapServers, + topics, + TimestampPolicyFactory.withLogAppendTime(), + /*consumerFactoryFn=*/ null); } public BeamKafkaTable( Schema beamSchema, String bootstrapServers, List topics, - TimestampPolicyFactory timestampPolicyFactory) { + TimestampPolicyFactory timestampPolicyFactory, + @Nullable + SerializableFunction, Consumer> consumerFactoryFn) { super(beamSchema); this.bootstrapServers = bootstrapServers; this.topics = topics; this.configUpdates = new HashMap<>(); this.timestampPolicyFactory = timestampPolicyFactory; + this.consumerFactoryFn = consumerFactoryFn; } public BeamKafkaTable( Schema beamSchema, List topicPartitions, String bootstrapServers) { - this(beamSchema, topicPartitions, bootstrapServers, TimestampPolicyFactory.withLogAppendTime()); + this( + beamSchema, + topicPartitions, + bootstrapServers, + TimestampPolicyFactory.withLogAppendTime(), + /*consumerFactoryFn=*/ null); } public BeamKafkaTable( Schema beamSchema, List topicPartitions, String bootstrapServers, - TimestampPolicyFactory timestampPolicyFactory) { + @Nullable + SerializableFunction, Consumer> consumerFactoryFn) { + this( + beamSchema, + topicPartitions, + bootstrapServers, + TimestampPolicyFactory.withLogAppendTime(), + consumerFactoryFn); + } + + public BeamKafkaTable( + Schema beamSchema, + List topicPartitions, + String bootstrapServers, + TimestampPolicyFactory timestampPolicyFactory, + @Nullable + SerializableFunction, Consumer> consumerFactoryFn) { super(beamSchema); this.bootstrapServers = bootstrapServers; this.topicPartitions = topicPartitions; this.timestampPolicyFactory = timestampPolicyFactory; + this.consumerFactoryFn = consumerFactoryFn; } public BeamKafkaTable updateConsumerProperties(Map configUpdates) { @@ -140,8 +174,10 @@ protected KafkaIO.Read createKafkaRead() { .withBootstrapServers(bootstrapServers) .withTopics(topics) .withConsumerConfigUpdates(configUpdates) - .withKeyDeserializerAndCoder(ByteArrayDeserializer.class, ByteArrayCoder.of()) - .withValueDeserializerAndCoder(ByteArrayDeserializer.class, ByteArrayCoder.of()) + .withKeyDeserializerAndCoder( + ByteArrayDeserializer.class, NullableCoder.of(ByteArrayCoder.of())) + .withValueDeserializerAndCoder( + ByteArrayDeserializer.class, NullableCoder.of(ByteArrayCoder.of())) .withTimestampPolicyFactory(timestampPolicyFactory); } else if (topicPartitions != null) { kafkaRead = @@ -149,12 +185,18 @@ protected KafkaIO.Read createKafkaRead() { .withBootstrapServers(bootstrapServers) .withTopicPartitions(topicPartitions) .withConsumerConfigUpdates(configUpdates) - .withKeyDeserializerAndCoder(ByteArrayDeserializer.class, ByteArrayCoder.of()) - .withValueDeserializerAndCoder(ByteArrayDeserializer.class, ByteArrayCoder.of()) + .withKeyDeserializerAndCoder( + ByteArrayDeserializer.class, NullableCoder.of(ByteArrayCoder.of())) + .withValueDeserializerAndCoder( + ByteArrayDeserializer.class, NullableCoder.of(ByteArrayCoder.of())) .withTimestampPolicyFactory(timestampPolicyFactory); } else { throw new InvalidTableException("One of topics and topicPartitions must be configurated."); } + if (consumerFactoryFn != null) { + kafkaRead = kafkaRead.withConsumerFactoryFn(consumerFactoryFn); + } + return kafkaRead; } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProvider.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProvider.java index 251aebf5d1ea..efa84b012947 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProvider.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTableProvider.java @@ -28,7 +28,9 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Map.Entry; +import java.util.Objects; import java.util.Optional; import org.apache.beam.sdk.extensions.sql.TableUtils; import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable; @@ -39,13 +41,19 @@ import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer; import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializers; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.util.InstanceBuilder; +import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; +import org.apache.kafka.clients.consumer.Consumer; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; import org.joda.time.format.PeriodFormat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Kafka table provider. @@ -72,6 +80,8 @@ */ @AutoService(TableProvider.class) public class KafkaTableProvider extends InMemoryMetaTableProvider { + private static final Logger LOG = LoggerFactory.getLogger(KafkaTableProvider.class); + private static class ParsedLocation { String brokerLocation = ""; String topic = ""; @@ -147,6 +157,33 @@ public BeamSqlTable buildBeamSqlTable(Table table) { } } + SerializableFunction, Consumer> consumerFactoryFnClass = + null; + if (properties.has("consumer.factory.fn")) { + String consumerFactoryFnAsString = properties.get("consumer.factory.fn").asText(); + if (consumerFactoryFnAsString.contains("KerberosConsumerFactoryFn")) { + if (!properties.has("consumer.factory.fn.params") + || !properties.get("consumer.factory.fn.params").has("krb5Location")) { + throw new RuntimeException( + "KerberosConsumerFactoryFn requires a krb5Location parameter, but none was set."); + } + } + try { + consumerFactoryFnClass = + InstanceBuilder.ofType( + new TypeDescriptor< + SerializableFunction, Consumer>>() {}) + .fromClassName(properties.get("consumer.factory.fn").asText()) + .withArg( + String.class, + Objects.requireNonNull( + properties.get("consumer.factory.fn.params").get("krb5Location").asText())) + .build(); + } catch (Exception e) { + throw new RuntimeException("Unable to construct the ConsumerFactoryFn class.", e); + } + } + BeamKafkaTable kafkaTable = null; if (Schemas.isNestedSchema(schema)) { Optional serializer = @@ -158,7 +195,12 @@ public BeamSqlTable buildBeamSqlTable(Table table) { TableUtils.convertNode2Map(properties))); kafkaTable = new NestedPayloadKafkaTable( - schema, bootstrapServers, topics, serializer, timestampPolicyFactory); + schema, + bootstrapServers, + topics, + serializer, + timestampPolicyFactory, + consumerFactoryFnClass); } else { /* * CSV is handled separately because multiple rows can be produced from a single message, which @@ -168,14 +210,20 @@ public BeamSqlTable buildBeamSqlTable(Table table) { */ if (payloadFormat.orElse("csv").equals("csv")) { kafkaTable = - new BeamKafkaCSVTable(schema, bootstrapServers, topics, timestampPolicyFactory); + new BeamKafkaCSVTable( + schema, bootstrapServers, topics, timestampPolicyFactory, consumerFactoryFnClass); } else { PayloadSerializer serializer = PayloadSerializers.getSerializer( payloadFormat.get(), schema, TableUtils.convertNode2Map(properties)); kafkaTable = new PayloadSerializerKafkaTable( - schema, bootstrapServers, topics, serializer, timestampPolicyFactory); + schema, + bootstrapServers, + topics, + serializer, + timestampPolicyFactory, + consumerFactoryFnClass); } } @@ -184,11 +232,12 @@ public BeamSqlTable buildBeamSqlTable(Table table) { Iterator> tableProperties = properties.fields(); while (tableProperties.hasNext()) { Entry field = tableProperties.next(); + LOG.info("TABLE PROPERTY: {}", field.getKey()); if (field.getKey().startsWith("properties.")) { configUpdates.put(field.getKey().replace("properties.", ""), field.getValue().textValue()); } } - + LOG.info("CONSUMER CONFIG UPDATES FROM BEAM SQL: {}", configUpdates); if (!configUpdates.isEmpty()) { kafkaTable.updateConsumerProperties(configUpdates); } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/NestedPayloadKafkaTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/NestedPayloadKafkaTable.java index a7336dcb9670..f60286e17af3 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/NestedPayloadKafkaTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/NestedPayloadKafkaTable.java @@ -23,6 +23,7 @@ import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.Optional; import javax.annotation.Nullable; import org.apache.beam.sdk.io.kafka.KafkaRecord; @@ -33,6 +34,7 @@ import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TypeDescriptor; @@ -41,6 +43,7 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableListMultimap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Headers; @@ -62,7 +65,8 @@ public NestedPayloadKafkaTable( bootstrapServers, topics, payloadSerializer, - TimestampPolicyFactory.withLogAppendTime()); + TimestampPolicyFactory.withLogAppendTime(), + null); } public NestedPayloadKafkaTable( @@ -71,7 +75,24 @@ public NestedPayloadKafkaTable( List topics, Optional payloadSerializer, TimestampPolicyFactory timestampPolicyFactory) { - super(beamSchema, bootstrapServers, topics, timestampPolicyFactory); + this( + beamSchema, + bootstrapServers, + topics, + payloadSerializer, + timestampPolicyFactory, + /*consumerFactoryFn=*/ null); + } + + public NestedPayloadKafkaTable( + Schema beamSchema, + String bootstrapServers, + List topics, + Optional payloadSerializer, + TimestampPolicyFactory timestampPolicyFactory, + @Nullable + SerializableFunction, Consumer> consumerFactoryFn) { + super(beamSchema, bootstrapServers, topics, timestampPolicyFactory, consumerFactoryFn); checkArgument(Schemas.isNestedSchema(schema)); Schemas.validateNestedSchema(schema); diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/PayloadSerializerKafkaTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/PayloadSerializerKafkaTable.java index 8bbe91026c41..015156b697d6 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/PayloadSerializerKafkaTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/PayloadSerializerKafkaTable.java @@ -18,16 +18,20 @@ package org.apache.beam.sdk.extensions.sql.meta.provider.kafka; import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; import org.apache.beam.sdk.io.kafka.KafkaRecord; import org.apache.beam.sdk.io.kafka.TimestampPolicyFactory; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.producer.ProducerRecord; public class PayloadSerializerKafkaTable extends BeamKafkaTable { @@ -38,8 +42,10 @@ public class PayloadSerializerKafkaTable extends BeamKafkaTable { String bootstrapServers, List topics, PayloadSerializer serializer, - TimestampPolicyFactory timestampPolicyFactory) { - super(requiredSchema, bootstrapServers, topics, timestampPolicyFactory); + TimestampPolicyFactory timestampPolicyFactory, + @Nullable + SerializableFunction, Consumer> consumerFactoryFn) { + super(requiredSchema, bootstrapServers, topics, timestampPolicyFactory, consumerFactoryFn); this.serializer = serializer; } diff --git a/sdks/java/io/expansion-service/build.gradle b/sdks/java/io/expansion-service/build.gradle index f1366817db22..dbd6e279846b 100644 --- a/sdks/java/io/expansion-service/build.gradle +++ b/sdks/java/io/expansion-service/build.gradle @@ -76,6 +76,8 @@ dependencies { permitUnusedDeclared project(":sdks:java:io:kafka") // BEAM-11761 implementation project(":sdks:java:io:kafka:upgrade") permitUnusedDeclared project(":sdks:java:io:kafka:upgrade") // BEAM-11761 + implementation project(":sdks:java:extensions:kafka-factories") + permitUnusedDeclared project(":sdks:java:extensions:kafka-factories") if (JavaVersion.current().compareTo(JavaVersion.VERSION_11) >= 0 && project.findProperty('testJavaVersion') != '8') { // iceberg ended support for Java 8 in 1.7.0 diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 568fe49217b3..48e4ae2317ac 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -35,6 +35,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.regex.Pattern; @@ -94,6 +95,7 @@ import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.Manual; import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing; import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.WallTime; +import org.apache.beam.sdk.util.InstanceBuilder; import org.apache.beam.sdk.util.Preconditions; import org.apache.beam.sdk.util.construction.PTransformMatchers; import org.apache.beam.sdk.util.construction.ReplacementOutputs; @@ -930,6 +932,34 @@ static void setupExternalBuilder( builder.setOffsetDeduplication(false); builder.setRedistributeByRecordKey(false); } + + if (config.consumerFactoryFnClass != null) { + if (config.consumerFactoryFnClass.contains("KerberosConsumerFactoryFn")) { + try { + if (!config.consumerFactoryFnParams.containsKey("krb5Location")) { + throw new IllegalArgumentException( + "The KerberosConsumerFactoryFn requires a location for the krb5.conf file. " + + "Please provide either a GCS location or Google Secret Manager location for this file."); + } + String krb5Location = config.consumerFactoryFnParams.get("krb5Location"); + builder.setConsumerFactoryFn( + InstanceBuilder.ofType( + new TypeDescriptor< + SerializableFunction< + Map, Consumer>>() {}) + .fromClassName(config.consumerFactoryFnClass) + .withArg(String.class, Objects.requireNonNull(krb5Location)) + .build()); + } catch (Exception e) { + throw new RuntimeException( + "Unable to construct FactoryFn " + + config.consumerFactoryFnClass + + ": " + + e.getMessage(), + e); + } + } + } } private static Coder resolveCoder(Class> deserializer) { @@ -1000,6 +1030,8 @@ public static class Configuration { private Boolean offsetDeduplication; private Boolean redistributeByRecordKey; private Long dynamicReadPollIntervalSeconds; + private String consumerFactoryFnClass; + private Map consumerFactoryFnParams; public void setConsumerConfig(Map consumerConfig) { this.consumerConfig = consumerConfig; @@ -1068,6 +1100,14 @@ public void setRedistributeByRecordKey(Boolean redistributeByRecordKey) { public void setDynamicReadPollIntervalSeconds(Long dynamicReadPollIntervalSeconds) { this.dynamicReadPollIntervalSeconds = dynamicReadPollIntervalSeconds; } + + public void setConsumerFactoryFnClass(String consumerFactoryFnClass) { + this.consumerFactoryFnClass = consumerFactoryFnClass; + } + + public void setConsumerFactoryFnParams(Map consumerFactoryFnParams) { + this.consumerFactoryFnParams = consumerFactoryFnParams; + } } } diff --git a/sdks/python/apache_beam/io/kafka.py b/sdks/python/apache_beam/io/kafka.py index f3e6c39cfda4..dd2d245e67cd 100644 --- a/sdks/python/apache_beam/io/kafka.py +++ b/sdks/python/apache_beam/io/kafka.py @@ -100,6 +100,7 @@ # pytype: skip-file +import collections import typing import numpy as np @@ -110,22 +111,21 @@ ReadFromKafkaSchema = typing.NamedTuple( 'ReadFromKafkaSchema', - [ - ('consumer_config', typing.Mapping[str, str]), - ('topics', typing.List[str]), - ('key_deserializer', str), - ('value_deserializer', str), - ('start_read_time', typing.Optional[int]), - ('max_num_records', typing.Optional[int]), - ('max_read_time', typing.Optional[int]), - ('commit_offset_in_finalize', bool), - ('timestamp_policy', str), - ('consumer_polling_timeout', typing.Optional[int]), - ('redistribute', typing.Optional[bool]), - ('redistribute_num_keys', typing.Optional[np.int32]), - ('allow_duplicates', typing.Optional[bool]), - ('dynamic_read_poll_interval_seconds', typing.Optional[int]), - ]) + [('consumer_config', typing.Mapping[str, str]), + ('topics', typing.List[str]), ('key_deserializer', str), + ('value_deserializer', str), ('start_read_time', typing.Optional[int]), + ('max_num_records', typing.Optional[int]), + ('max_read_time', typing.Optional[int]), + ('commit_offset_in_finalize', bool), ('timestamp_policy', str), + ('consumer_polling_timeout', typing.Optional[int]), + ('redistribute', typing.Optional[bool]), + ('redistribute_num_keys', typing.Optional[np.int32]), + ('allow_duplicates', typing.Optional[bool]), + ('dynamic_read_poll_interval_seconds', typing.Optional[int]), + ('consumer_factory_fn_class', typing.Optional[str]), + ( + 'consumer_factory_fn_params', + typing.Optional[collections.abc.Mapping[str, str]])]) def default_io_expansion_service(append_args=None): @@ -173,7 +173,10 @@ def __init__( redistribute_num_keys=np.int32(0), allow_duplicates=False, dynamic_read_poll_interval_seconds: typing.Optional[int] = None, - ): + consumer_factory_fn_class: typing.Optional[ + collections.abc.Mapping] = None, + consumer_factory_fn_params: typing.Optional[ + collections.abc.Mapping] = None): """ Initializes a read operation from Kafka. @@ -216,6 +219,13 @@ def __init__( :param dynamic_read_poll_interval_seconds: The interval in seconds at which to check for new partitions. If not None, dynamic partition discovery is enabled. + :param consumer_factory_fn_class: A fully qualified classpath to an + existing provided consumerFactoryFn. If not None, this will construct + Kafka consumers with a custom configuration. + :param consumer_factory_fn_params: A map which specifies the parameters for + the provided consumer_factory_fn_class. If not None, the values in this + map will be used when constructing the consumer_factory_fn_class object. + This cannot be null if the consumer_factory_fn_class is not null. """ if timestamp_policy not in [ReadFromKafka.processing_time_policy, ReadFromKafka.create_time_policy, @@ -242,7 +252,9 @@ def __init__( redistribute_num_keys=redistribute_num_keys, allow_duplicates=allow_duplicates, dynamic_read_poll_interval_seconds= - dynamic_read_poll_interval_seconds)), + dynamic_read_poll_interval_seconds, + consumer_factory_fn_class=consumer_factory_fn_class, + consumer_factory_fn_params=consumer_factory_fn_params)), expansion_service or default_io_expansion_service()) diff --git a/settings.gradle.kts b/settings.gradle.kts index 97facd1e3918..7161d0c87af5 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -18,14 +18,14 @@ import com.gradle.enterprise.gradleplugin.internal.extension.BuildScanExtensionWithHiddenFeatures pluginManagement { - plugins { - id("org.javacc.javacc") version "3.0.3" // enable the JavaCC parser generator - } + plugins { + id("org.javacc.javacc") version "3.0.3" // enable the JavaCC parser generator + } } plugins { - id("com.gradle.develocity") version "3.19" - id("com.gradle.common-custom-user-data-gradle-plugin") version "2.4.0" + id("com.gradle.develocity") version "3.19" + id("com.gradle.common-custom-user-data-gradle-plugin") version "2.2.1" } @@ -36,32 +36,32 @@ val isGithubActionsBuild = arrayOf("GITHUB_REPOSITORY", "GITHUB_RUN_ID").all { S val isCi = isJenkinsBuild || isGithubActionsBuild develocity { - server = "https://develocity.apache.org" - projectId = "beam" + server = "https://develocity.apache.org" + projectId = "beam" - buildScan { - uploadInBackground = !isCi - publishing.onlyIf { it.isAuthenticated } - obfuscation { - ipAddresses { addresses -> addresses.map { "0.0.0.0" } } + buildScan { + uploadInBackground = !isCi + publishing.onlyIf { it.isAuthenticated } + obfuscation { + ipAddresses { addresses -> addresses.map { "0.0.0.0" } } + } } - } } buildCache { - local { - isEnabled = true - } - remote { - url = uri("https://beam-cache.apache.org/cache/") - isAllowUntrustedServer = false - credentials { - username = System.getenv("GRADLE_ENTERPRISE_CACHE_USERNAME") - password = System.getenv("GRADLE_ENTERPRISE_CACHE_PASSWORD") + local { + isEnabled = true + } + remote { + url = uri("https://beam-cache.apache.org/cache/") + isAllowUntrustedServer = false + credentials { + username = System.getenv("GRADLE_ENTERPRISE_CACHE_USERNAME") + password = System.getenv("GRADLE_ENTERPRISE_CACHE_PASSWORD") + } + isEnabled = !System.getenv("GRADLE_ENTERPRISE_CACHE_USERNAME").isNullOrBlank() + isPush = isCi && !System.getenv("GRADLE_ENTERPRISE_CACHE_USERNAME").isNullOrBlank() } - isEnabled = !System.getenv("GRADLE_ENTERPRISE_CACHE_USERNAME").isNullOrBlank() - isPush = isCi && !System.getenv("GRADLE_ENTERPRISE_CACHE_USERNAME").isNullOrBlank() - } } rootProject.name = "beam" @@ -246,6 +246,7 @@ include(":sdks:java:io:json") include(":sdks:java:io:kafka") include(":sdks:java:io:kafka:jmh") include(":sdks:java:io:kafka:upgrade") +include(":sdks:java:io:kafka:file-aware-factories") include(":sdks:java:io:kudu") include(":sdks:java:io:mongodb") include(":sdks:java:io:mqtt")