diff --git a/pom.xml b/pom.xml
index d085dd4..b8555fd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -7,14 +7,20 @@
com.viveknaskar
dataflow-redis-example
- 1.0.1
+ 1.0.2
UTF-8
- 3.7.0
- 1.6.0
- 1.7.25
- 2.26.0
+ 3.12.1
+ 3.1.0
+ 2.0.12
+ 2.56.0
+ 1.1.1
+ 1.10.4
+ 5.2.0
+ 4.13.2
+ 1.3
+ 2.0.13
@@ -38,26 +44,10 @@
maven-compiler-plugin
${maven-compiler-plugin.version}
- 11
- 11
+ 17
+ 17
-
-
-
-
@@ -115,34 +105,34 @@
com.github.fppt
jedis-mock
- 0.1.16
+ ${jedis-mock.version}
com.google.auto.value
auto-value
- 1.7.4
+ ${auto-value.version}
provided
org.mockito
- mockito-core
- 2.18.0
+ mockito-inline
+ ${mockito.version}
test
junit
junit
- 4.13
+ ${junit.version}
test
org.hamcrest
hamcrest-all
- 1.3
+ ${hamcrest.version}
test
@@ -155,7 +145,7 @@
org.slf4j
slf4j-jdk14
- ${slf4j.version}
+ ${slf4j-jdk14.version}
diff --git a/src/main/java/com/viveknaskar/StarterPipeline.java b/src/main/java/com/viveknaskar/StarterPipeline.java
index b814c7c..b69bd14 100644
--- a/src/main/java/com/viveknaskar/StarterPipeline.java
+++ b/src/main/java/com/viveknaskar/StarterPipeline.java
@@ -35,7 +35,7 @@ public static void main(String[] args) {
* Flushing the Memorystore if there are records in the input file
*/
PCollection flushFlag = lines.apply("Checking Data in input file", Count.globally())
- .apply("Flushing the data store", FlushingMemorystore.read()
+ .apply("Flushing the data store", FlushingMemoryStore.read()
.withConnectionConfiguration(RedisConnectionConfiguration
.create(options.getRedisHost(), options.getRedisPort())));
@@ -110,7 +110,7 @@ public static void main(String[] args) {
ppidDataSet.apply(Wait.on(flushFlag))
.apply("Creating PPID index",
- RedisHashIO.write().withConnectionConfiguration(RedisConnectionConfiguration
+ WritingInMemoryStore.write().withConnectionConfiguration(RedisConnectionConfiguration
.create(options.getRedisHost(), options.getRedisPort())));
p.run();
diff --git a/src/main/java/com/viveknaskar/functions/FlushingMemorystore.java b/src/main/java/com/viveknaskar/functions/FlushingMemoryStore.java
similarity index 65%
rename from src/main/java/com/viveknaskar/functions/FlushingMemorystore.java
rename to src/main/java/com/viveknaskar/functions/FlushingMemoryStore.java
index 7e6fcd8..0d46584 100644
--- a/src/main/java/com/viveknaskar/functions/FlushingMemorystore.java
+++ b/src/main/java/com/viveknaskar/functions/FlushingMemoryStore.java
@@ -1,24 +1,26 @@
package com.viveknaskar.functions;
-import org.checkerframework.checker.nullness.qual.Nullable;
import com.google.auto.value.AutoValue;
+import com.google.common.base.Preconditions;
import org.apache.beam.sdk.io.redis.RedisConnectionConfiguration;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.vendor.grpc.v1p26p0.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
-import redis.clients.jedis.Pipeline;
+import redis.clients.jedis.Transaction;
-public class FlushingMemorystore extends DoFn {
+import javax.annotation.Nullable;
+import java.util.Objects;
- private static final Logger LOGGER = LoggerFactory.getLogger(FlushingMemorystore.class);
+public class FlushingMemoryStore extends DoFn {
- public static FlushingMemorystore.Read read() {
- return (new AutoValue_FlushingMemorystore_Read.Builder())
+ private static final Logger LOGGER = LoggerFactory.getLogger(FlushingMemoryStore.class);
+
+ public static FlushingMemoryStore.Read read() {
+ return (new AutoValue_FlushingMemoryStore_Read.Builder())
.setConnectionConfiguration(RedisConnectionConfiguration.create()).build();
}
@@ -33,30 +35,30 @@ public Read() {
@Nullable
abstract Long expireTime();
- abstract FlushingMemorystore.Read.Builder toBuilder();
+ abstract FlushingMemoryStore.Read.Builder toBuilder();
- public FlushingMemorystore.Read withEndpoint(String host, int port) {
+ public FlushingMemoryStore.Read withEndpoint(String host, int port) {
Preconditions.checkArgument(host != null, "host cannot be null");
Preconditions.checkArgument(port > 0, "port cannot be negative or 0");
return this.toBuilder().setConnectionConfiguration(this.connectionConfiguration().withHost(host).withPort(port)).build();
}
- public FlushingMemorystore.Read withAuth(String auth) {
+ public FlushingMemoryStore.Read withAuth(String auth) {
Preconditions.checkArgument(auth != null, "auth cannot be null");
return this.toBuilder().setConnectionConfiguration(this.connectionConfiguration().withAuth(auth)).build();
}
- public FlushingMemorystore.Read withTimeout(int timeout) {
+ public FlushingMemoryStore.Read withTimeout(int timeout) {
Preconditions.checkArgument(timeout >= 0, "timeout cannot be negative");
return this.toBuilder().setConnectionConfiguration(this.connectionConfiguration().withTimeout(timeout)).build();
}
- public FlushingMemorystore.Read withConnectionConfiguration(RedisConnectionConfiguration connectionConfiguration) {
+ public FlushingMemoryStore.Read withConnectionConfiguration(RedisConnectionConfiguration connectionConfiguration) {
Preconditions.checkArgument(connectionConfiguration != null, "connection cannot be null");
return this.toBuilder().setConnectionConfiguration(connectionConfiguration).build();
}
- public FlushingMemorystore.Read withExpireTime(Long expireTimeMillis) {
+ public FlushingMemoryStore.Read withExpireTime(Long expireTimeMillis) {
Preconditions.checkArgument(expireTimeMillis != null, "expireTimeMillis cannot be null");
Preconditions.checkArgument(expireTimeMillis > 0L, "expireTimeMillis cannot be negative or 0");
return this.toBuilder().setExpireTime(expireTimeMillis).build();
@@ -64,35 +66,34 @@ public FlushingMemorystore.Read withExpireTime(Long expireTimeMillis) {
public PCollection expand(PCollection input) {
Preconditions.checkArgument(this.connectionConfiguration() != null, "withConnectionConfiguration() is required");
- return input.apply(ParDo.of(new FlushingMemorystore.Read.ReadFn(this)));
+ return input.apply(ParDo.of(new FlushingMemoryStore.Read.ReadFn(this)));
}
@Setup
public Jedis setup() {
- return this.connectionConfiguration().connect();
+ return Objects.requireNonNull(this.connectionConfiguration()).connect();
}
private static class ReadFn extends DoFn {
private static final int DEFAULT_BATCH_SIZE = 1000;
- private final FlushingMemorystore.Read spec;
+ private final FlushingMemoryStore.Read spec;
private transient Jedis jedis;
- private transient Pipeline pipeline;
+ private transient @Nullable Transaction transaction;
private int batchCount;
- public ReadFn(FlushingMemorystore.Read spec) {
+ public ReadFn(FlushingMemoryStore.Read spec) {
this.spec = spec;
}
@Setup
public void setup() {
- this.jedis = this.spec.connectionConfiguration().connect();
+ this.jedis = Objects.requireNonNull(this.spec.connectionConfiguration()).connect();
}
@StartBundle
public void startBundle() {
- this.pipeline = this.jedis.pipelined();
- this.pipeline.multi();
- this.batchCount = 0;
+ transaction = jedis.multi();
+ batchCount = 0;
}
@ProcessElement
@@ -100,12 +101,8 @@ public void processElement(@Element Long count, OutputReceiver out) {
batchCount++;
if(count!=null && count > 0) {
- if (pipeline.isInMulti()) {
- pipeline.exec();
- pipeline.sync();
- jedis.flushDB();
- LOGGER.info("*****The memorystore is flushed*****");
- }
+ jedis.flushDB();
+ LOGGER.info("*****The Memorystore is flushed*****");
out.output("SUCCESS");
} else {
LOGGER.info("No Records are there in the input file");
@@ -116,11 +113,14 @@ public void processElement(@Element Long count, OutputReceiver out) {
@FinishBundle
public void finishBundle() {
- if (this.pipeline.isInMulti()) {
- this.pipeline.exec();
- this.pipeline.sync();
+ if (batchCount > 0) {
+ transaction.exec();
+ }
+ if (transaction != null) {
+ transaction.close();
}
- this.batchCount=0;
+ transaction = null;
+ batchCount = 0;
}
@Teardown
@@ -136,11 +136,11 @@ abstract static class Builder {
Builder() {
}
- abstract FlushingMemorystore.Read.Builder setExpireTime(Long expireTimeMillis);
+ abstract FlushingMemoryStore.Read.Builder setExpireTime(Long expireTimeMillis);
- abstract FlushingMemorystore.Read build();
+ abstract FlushingMemoryStore.Read build();
- abstract FlushingMemorystore.Read.Builder setConnectionConfiguration(RedisConnectionConfiguration connectionConfiguration);
+ abstract FlushingMemoryStore.Read.Builder setConnectionConfiguration(RedisConnectionConfiguration connectionConfiguration);
}
diff --git a/src/main/java/com/viveknaskar/functions/RedisHashIO.java b/src/main/java/com/viveknaskar/functions/WritingInMemoryStore.java
similarity index 69%
rename from src/main/java/com/viveknaskar/functions/RedisHashIO.java
rename to src/main/java/com/viveknaskar/functions/WritingInMemoryStore.java
index 4c62f2c..2935d5f 100644
--- a/src/main/java/com/viveknaskar/functions/RedisHashIO.java
+++ b/src/main/java/com/viveknaskar/functions/WritingInMemoryStore.java
@@ -1,7 +1,7 @@
package com.viveknaskar.functions;
-import org.checkerframework.checker.nullness.qual.Nullable;
import com.google.auto.value.AutoValue;
+import com.google.common.base.Preconditions;
import org.apache.beam.sdk.io.redis.RedisConnectionConfiguration;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
@@ -9,24 +9,21 @@
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
-import org.apache.beam.vendor.grpc.v1p26p0.com.google.common.base.Preconditions;
+import org.checkerframework.checker.nullness.qual.Nullable;
import redis.clients.jedis.Jedis;
-import redis.clients.jedis.Pipeline;
-
-public class RedisHashIO {
+import redis.clients.jedis.Transaction;
- public static RedisHashIO.Write write() {
+public class WritingInMemoryStore {
- return (new AutoValue_RedisHashIO_Write.Builder())
+ public static WritingInMemoryStore.Write write() {
+ return (new AutoValue_WritingInMemoryStore_Write.Builder())
.setConnectionConfiguration(RedisConnectionConfiguration.create()).build();
-
}
@AutoValue
public abstract static class Write extends PTransform>>, PDone> {
- public Write() {
- }
+ public Write() {}
@Nullable
abstract RedisConnectionConfiguration connectionConfiguration();
@@ -34,30 +31,30 @@ public Write() {
@Nullable
abstract Long expireTime();
- abstract RedisHashIO.Write.Builder toBuilder();
+ abstract WritingInMemoryStore.Write.Builder toBuilder();
- public RedisHashIO.Write withEndpoint(String host, int port) {
+ public WritingInMemoryStore.Write withEndpoint(String host, int port) {
Preconditions.checkArgument(host != null, "host cannot be null");
Preconditions.checkArgument(port > 0, "port cannot be negative or 0");
return this.toBuilder().setConnectionConfiguration(this.connectionConfiguration().withHost(host).withPort(port)).build();
}
- public RedisHashIO.Write withAuth(String auth) {
+ public WritingInMemoryStore.Write withAuth(String auth) {
Preconditions.checkArgument(auth != null, "auth cannot be null");
return this.toBuilder().setConnectionConfiguration(this.connectionConfiguration().withAuth(auth)).build();
}
- public RedisHashIO.Write withTimeout(int timeout) {
+ public WritingInMemoryStore.Write withTimeout(int timeout) {
Preconditions.checkArgument(timeout >= 0, "timeout cannot be negative");
return this.toBuilder().setConnectionConfiguration(this.connectionConfiguration().withTimeout(timeout)).build();
}
- public RedisHashIO.Write withConnectionConfiguration(RedisConnectionConfiguration connectionConfiguration) {
+ public WritingInMemoryStore.Write withConnectionConfiguration(RedisConnectionConfiguration connectionConfiguration) {
Preconditions.checkArgument(connectionConfiguration != null, "connection cannot be null");
return this.toBuilder().setConnectionConfiguration(connectionConfiguration).build();
}
- public RedisHashIO.Write withExpireTime(Long expireTimeMillis) {
+ public WritingInMemoryStore.Write withExpireTime(Long expireTimeMillis) {
Preconditions.checkArgument(expireTimeMillis != null, "expireTimeMillis cannot be null");
Preconditions.checkArgument(expireTimeMillis > 0L, "expireTimeMillis cannot be negative or 0");
return this.toBuilder().setExpireTime(expireTimeMillis).build();
@@ -65,18 +62,18 @@ public RedisHashIO.Write withExpireTime(Long expireTimeMillis) {
public PDone expand(PCollection>> input) {
Preconditions.checkArgument(this.connectionConfiguration() != null, "withConnectionConfiguration() is required");
- input.apply(ParDo.of(new RedisHashIO.Write.WriteFn(this)));
+ input.apply(ParDo.of(new WritingInMemoryStore.Write.WriteFn(this)));
return PDone.in(input.getPipeline());
}
private static class WriteFn extends DoFn>, Void> {
private static final int DEFAULT_BATCH_SIZE = 1000;
- private final RedisHashIO.Write spec;
+ private final WritingInMemoryStore.Write spec;
private transient Jedis jedis;
- private transient Pipeline pipeline;
+ private transient @Nullable Transaction transaction;
private int batchCount;
- public WriteFn(RedisHashIO.Write spec) {
+ public WriteFn(WritingInMemoryStore.Write spec) {
this.spec = spec;
}
@@ -87,9 +84,8 @@ public void setup() {
@StartBundle
public void startBundle() {
- this.pipeline = this.jedis.pipelined();
- this.pipeline.multi();
- this.batchCount = 0;
+ transaction = jedis.multi();
+ batchCount = 0;
}
@ProcessElement
@@ -101,9 +97,8 @@ public void processElement(DoFn>, Void>.ProcessCon
batchCount++;
if (batchCount >= DEFAULT_BATCH_SIZE) {
- pipeline.exec();
- pipeline.sync();
- pipeline.multi();
+ transaction.exec();
+ transaction.multi();
batchCount = 0;
}
}
@@ -114,17 +109,20 @@ private void writeRecord(KV> record) {
String fieldKey = hashValue.getKey();
String value = hashValue.getValue();
- pipeline.hset(hashKey, fieldKey, value);
+ transaction.sadd(hashKey, fieldKey, value);
}
@FinishBundle
public void finishBundle() {
- if (this.pipeline.isInMulti()) {
- this.pipeline.exec();
- this.pipeline.sync();
+ if (batchCount > 0) {
+ transaction.exec();
+ }
+ if (transaction != null) {
+ transaction.close();
}
- this.batchCount = 0;
+ transaction = null;
+ batchCount = 0;
}
@Teardown
@@ -140,11 +138,11 @@ abstract static class Builder {
Builder() {
}
- abstract RedisHashIO.Write.Builder setExpireTime(Long expireTimeMillis);
+ abstract WritingInMemoryStore.Write.Builder setExpireTime(Long expireTimeMillis);
- abstract RedisHashIO.Write build();
+ abstract WritingInMemoryStore.Write build();
- abstract RedisHashIO.Write.Builder setConnectionConfiguration(RedisConnectionConfiguration connectionConfiguration);
+ abstract WritingInMemoryStore.Write.Builder setConnectionConfiguration(RedisConnectionConfiguration connectionConfiguration);
}
diff --git a/src/test/java/com/viveknaskar/functions/ProcessingPPIDTest.java b/src/test/java/com/viveknaskar/functions/ProcessingPPIDTest.java
index ff93401..17b4905 100644
--- a/src/test/java/com/viveknaskar/functions/ProcessingPPIDTest.java
+++ b/src/test/java/com/viveknaskar/functions/ProcessingPPIDTest.java
@@ -20,7 +20,7 @@ public class ProcessingPPIDTest {
@Rule
public final transient TestPipeline pipeline = TestPipeline.create();
- private static StorageToRedisOptions options = PipelineOptionsFactory.create()
+ private static final StorageToRedisOptions options = PipelineOptionsFactory.create()
.as(StorageToRedisOptions.class);
private static final String[] INPUT_DATA = new String[] {
diff --git a/src/test/java/com/viveknaskar/functions/ProcessingRecordsTest.java b/src/test/java/com/viveknaskar/functions/ProcessingRecordsTest.java
index a0de52e..6bd4d47 100644
--- a/src/test/java/com/viveknaskar/functions/ProcessingRecordsTest.java
+++ b/src/test/java/com/viveknaskar/functions/ProcessingRecordsTest.java
@@ -20,7 +20,7 @@ public class ProcessingRecordsTest {
@Rule
public final transient TestPipeline pipeline = TestPipeline.create();
- private static StorageToRedisOptions options = PipelineOptionsFactory.create()
+ private static final StorageToRedisOptions options = PipelineOptionsFactory.create()
.as(StorageToRedisOptions.class);
private static final String[] INPUT_DATA = new String[] {
diff --git a/src/test/java/com/viveknaskar/functions/TransformingDataTest.java b/src/test/java/com/viveknaskar/functions/TransformingDataTest.java
index 2bcfa1c..4c793b9 100644
--- a/src/test/java/com/viveknaskar/functions/TransformingDataTest.java
+++ b/src/test/java/com/viveknaskar/functions/TransformingDataTest.java
@@ -19,7 +19,7 @@ public class TransformingDataTest {
@Rule
public final transient TestPipeline pipeline = TestPipeline.create();
- private static StorageToRedisOptions options = PipelineOptionsFactory.create()
+ private static final StorageToRedisOptions options = PipelineOptionsFactory.create()
.as(StorageToRedisOptions.class);
private static final String INPUT_DATA =
diff --git a/src/test/java/com/viveknaskar/functions/RedisHashIOTest.java b/src/test/java/com/viveknaskar/functions/WritingInMemoryStoreTest.java
similarity index 72%
rename from src/test/java/com/viveknaskar/functions/RedisHashIOTest.java
rename to src/test/java/com/viveknaskar/functions/WritingInMemoryStoreTest.java
index 1509e92..69014e3 100644
--- a/src/test/java/com/viveknaskar/functions/RedisHashIOTest.java
+++ b/src/test/java/com/viveknaskar/functions/WritingInMemoryStoreTest.java
@@ -11,8 +11,11 @@
import org.mockito.junit.MockitoJUnitRunner;
import redis.clients.jedis.Jedis;
+import java.io.IOException;
+import java.util.Set;
+
@RunWith(MockitoJUnitRunner.class)
-public class RedisHashIOTest {
+public class WritingInMemoryStoreTest {
private static final String REDIS_HOST = "localhost";
@@ -33,27 +36,27 @@ public static void beforeClass() throws Exception {
}
@AfterClass
- public static void afterClass() {
+ public static void afterClass() throws IOException {
client.close();
server.stop();
}
@Test
- public void TestWriteHashWithConfig() {
+ public void testWriteHashWithConfig() {
KV fieldValue = KV.of("hash12", "p11");
- KV> record = KV.of("hash11:bbbbbb", fieldValue );
+ KV> record = KV.of("hash11:bbbbbb", fieldValue);
PCollection>> write = pipeline.apply(Create.of(record));
- write.apply("Writing Hash into Redis", RedisHashIO.write()
+ write.apply("Writing Hash into Redis", WritingInMemoryStore.write()
.withConnectionConfiguration(RedisConnectionConfiguration
- .create(REDIS_HOST, port)));
+ .create(REDIS_HOST, port)));
pipeline.run();
- String ppid = client.hget("hash11:bbbbbb", "hash12");
- Assert.assertEquals(ppid, "p11");
-
+ Set members = client.smembers("hash11:bbbbbb");
+ boolean isMember = members.contains("hash12");
+ Assert.assertTrue("The record should be a member in the set", isMember);
}
}
\ No newline at end of file