From 07c2c0b2b2a222ec88f651da4bd319d74a646959 Mon Sep 17 00:00:00 2001 From: Vivek Naskar Date: Mon, 8 Jan 2024 10:28:32 +0530 Subject: [PATCH 1/3] upgraded the code changes to java 17 along with the test cases upgrade --- pom.xml | 30 ++------ .../java/com/viveknaskar/StarterPipeline.java | 4 +- ...orystore.java => FlushingMemoryStore.java} | 70 +++++++++---------- ...sHashIO.java => WritingInMemoryStore.java} | 61 ++++++++-------- ...est.java => WritingInMemoryStoreTest.java} | 18 ++--- 5 files changed, 85 insertions(+), 98 deletions(-) rename src/main/java/com/viveknaskar/functions/{FlushingMemorystore.java => FlushingMemoryStore.java} (65%) rename src/main/java/com/viveknaskar/functions/{RedisHashIO.java => WritingInMemoryStore.java} (69%) rename src/test/java/com/viveknaskar/functions/{RedisHashIOTest.java => WritingInMemoryStoreTest.java} (76%) diff --git a/pom.xml b/pom.xml index d085dd4..0af47ae 100644 --- a/pom.xml +++ b/pom.xml @@ -7,14 +7,14 @@ com.viveknaskar dataflow-redis-example - 1.0.1 + 1.0.2 UTF-8 - 3.7.0 - 1.6.0 - 1.7.25 - 2.26.0 + 3.11.0 + 3.1.0 + 2.0.7 + 2.52.0 @@ -38,26 +38,10 @@ maven-compiler-plugin ${maven-compiler-plugin.version} - 11 - 11 + 17 + 17 - - - - diff --git a/src/main/java/com/viveknaskar/StarterPipeline.java b/src/main/java/com/viveknaskar/StarterPipeline.java index b814c7c..b69bd14 100644 --- a/src/main/java/com/viveknaskar/StarterPipeline.java +++ b/src/main/java/com/viveknaskar/StarterPipeline.java @@ -35,7 +35,7 @@ public static void main(String[] args) { * Flushing the Memorystore if there are records in the input file */ PCollection flushFlag = lines.apply("Checking Data in input file", Count.globally()) - .apply("Flushing the data store", FlushingMemorystore.read() + .apply("Flushing the data store", FlushingMemoryStore.read() .withConnectionConfiguration(RedisConnectionConfiguration .create(options.getRedisHost(), options.getRedisPort()))); @@ -110,7 +110,7 @@ public static void main(String[] args) { ppidDataSet.apply(Wait.on(flushFlag)) .apply("Creating PPID index", - RedisHashIO.write().withConnectionConfiguration(RedisConnectionConfiguration + WritingInMemoryStore.write().withConnectionConfiguration(RedisConnectionConfiguration .create(options.getRedisHost(), options.getRedisPort()))); p.run(); diff --git a/src/main/java/com/viveknaskar/functions/FlushingMemorystore.java b/src/main/java/com/viveknaskar/functions/FlushingMemoryStore.java similarity index 65% rename from src/main/java/com/viveknaskar/functions/FlushingMemorystore.java rename to src/main/java/com/viveknaskar/functions/FlushingMemoryStore.java index 7e6fcd8..d9f9915 100644 --- a/src/main/java/com/viveknaskar/functions/FlushingMemorystore.java +++ b/src/main/java/com/viveknaskar/functions/FlushingMemoryStore.java @@ -1,24 +1,26 @@ package com.viveknaskar.functions; -import org.checkerframework.checker.nullness.qual.Nullable; import com.google.auto.value.AutoValue; import org.apache.beam.sdk.io.redis.RedisConnectionConfiguration; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.vendor.grpc.v1p26p0.com.google.common.base.Preconditions; +import org.apache.beam.vendor.grpc.v1p54p0.com.google.common.base.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import redis.clients.jedis.Jedis; -import redis.clients.jedis.Pipeline; +import redis.clients.jedis.Transaction; -public class FlushingMemorystore extends DoFn { +import javax.annotation.Nullable; +import java.util.Objects; - private static final Logger LOGGER = LoggerFactory.getLogger(FlushingMemorystore.class); +public class FlushingMemoryStore extends DoFn { - public static FlushingMemorystore.Read read() { - return (new AutoValue_FlushingMemorystore_Read.Builder()) + private static final Logger LOGGER = LoggerFactory.getLogger(FlushingMemoryStore.class); + + public static FlushingMemoryStore.Read read() { + return (new AutoValue_FlushingMemoryStore_Read.Builder()) .setConnectionConfiguration(RedisConnectionConfiguration.create()).build(); } @@ -33,30 +35,30 @@ public Read() { @Nullable abstract Long expireTime(); - abstract FlushingMemorystore.Read.Builder toBuilder(); + abstract FlushingMemoryStore.Read.Builder toBuilder(); - public FlushingMemorystore.Read withEndpoint(String host, int port) { + public FlushingMemoryStore.Read withEndpoint(String host, int port) { Preconditions.checkArgument(host != null, "host cannot be null"); Preconditions.checkArgument(port > 0, "port cannot be negative or 0"); return this.toBuilder().setConnectionConfiguration(this.connectionConfiguration().withHost(host).withPort(port)).build(); } - public FlushingMemorystore.Read withAuth(String auth) { + public FlushingMemoryStore.Read withAuth(String auth) { Preconditions.checkArgument(auth != null, "auth cannot be null"); return this.toBuilder().setConnectionConfiguration(this.connectionConfiguration().withAuth(auth)).build(); } - public FlushingMemorystore.Read withTimeout(int timeout) { + public FlushingMemoryStore.Read withTimeout(int timeout) { Preconditions.checkArgument(timeout >= 0, "timeout cannot be negative"); return this.toBuilder().setConnectionConfiguration(this.connectionConfiguration().withTimeout(timeout)).build(); } - public FlushingMemorystore.Read withConnectionConfiguration(RedisConnectionConfiguration connectionConfiguration) { + public FlushingMemoryStore.Read withConnectionConfiguration(RedisConnectionConfiguration connectionConfiguration) { Preconditions.checkArgument(connectionConfiguration != null, "connection cannot be null"); return this.toBuilder().setConnectionConfiguration(connectionConfiguration).build(); } - public FlushingMemorystore.Read withExpireTime(Long expireTimeMillis) { + public FlushingMemoryStore.Read withExpireTime(Long expireTimeMillis) { Preconditions.checkArgument(expireTimeMillis != null, "expireTimeMillis cannot be null"); Preconditions.checkArgument(expireTimeMillis > 0L, "expireTimeMillis cannot be negative or 0"); return this.toBuilder().setExpireTime(expireTimeMillis).build(); @@ -64,35 +66,34 @@ public FlushingMemorystore.Read withExpireTime(Long expireTimeMillis) { public PCollection expand(PCollection input) { Preconditions.checkArgument(this.connectionConfiguration() != null, "withConnectionConfiguration() is required"); - return input.apply(ParDo.of(new FlushingMemorystore.Read.ReadFn(this))); + return input.apply(ParDo.of(new FlushingMemoryStore.Read.ReadFn(this))); } @Setup public Jedis setup() { - return this.connectionConfiguration().connect(); + return Objects.requireNonNull(this.connectionConfiguration()).connect(); } private static class ReadFn extends DoFn { private static final int DEFAULT_BATCH_SIZE = 1000; - private final FlushingMemorystore.Read spec; + private final FlushingMemoryStore.Read spec; private transient Jedis jedis; - private transient Pipeline pipeline; + private transient @Nullable Transaction transaction; private int batchCount; - public ReadFn(FlushingMemorystore.Read spec) { + public ReadFn(FlushingMemoryStore.Read spec) { this.spec = spec; } @Setup public void setup() { - this.jedis = this.spec.connectionConfiguration().connect(); + this.jedis = Objects.requireNonNull(this.spec.connectionConfiguration()).connect(); } @StartBundle public void startBundle() { - this.pipeline = this.jedis.pipelined(); - this.pipeline.multi(); - this.batchCount = 0; + transaction = jedis.multi(); + batchCount = 0; } @ProcessElement @@ -100,12 +101,8 @@ public void processElement(@Element Long count, OutputReceiver out) { batchCount++; if(count!=null && count > 0) { - if (pipeline.isInMulti()) { - pipeline.exec(); - pipeline.sync(); - jedis.flushDB(); - LOGGER.info("*****The memorystore is flushed*****"); - } + jedis.flushDB(); + LOGGER.info("*****The Memorystore is flushed*****"); out.output("SUCCESS"); } else { LOGGER.info("No Records are there in the input file"); @@ -116,11 +113,14 @@ public void processElement(@Element Long count, OutputReceiver out) { @FinishBundle public void finishBundle() { - if (this.pipeline.isInMulti()) { - this.pipeline.exec(); - this.pipeline.sync(); + if (batchCount > 0) { + transaction.exec(); + } + if (transaction != null) { + transaction.close(); } - this.batchCount=0; + transaction = null; + batchCount = 0; } @Teardown @@ -136,11 +136,11 @@ abstract static class Builder { Builder() { } - abstract FlushingMemorystore.Read.Builder setExpireTime(Long expireTimeMillis); + abstract FlushingMemoryStore.Read.Builder setExpireTime(Long expireTimeMillis); - abstract FlushingMemorystore.Read build(); + abstract FlushingMemoryStore.Read build(); - abstract FlushingMemorystore.Read.Builder setConnectionConfiguration(RedisConnectionConfiguration connectionConfiguration); + abstract FlushingMemoryStore.Read.Builder setConnectionConfiguration(RedisConnectionConfiguration connectionConfiguration); } diff --git a/src/main/java/com/viveknaskar/functions/RedisHashIO.java b/src/main/java/com/viveknaskar/functions/WritingInMemoryStore.java similarity index 69% rename from src/main/java/com/viveknaskar/functions/RedisHashIO.java rename to src/main/java/com/viveknaskar/functions/WritingInMemoryStore.java index 4c62f2c..4a45722 100644 --- a/src/main/java/com/viveknaskar/functions/RedisHashIO.java +++ b/src/main/java/com/viveknaskar/functions/WritingInMemoryStore.java @@ -1,6 +1,5 @@ package com.viveknaskar.functions; -import org.checkerframework.checker.nullness.qual.Nullable; import com.google.auto.value.AutoValue; import org.apache.beam.sdk.io.redis.RedisConnectionConfiguration; import org.apache.beam.sdk.transforms.DoFn; @@ -9,15 +8,16 @@ import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; -import org.apache.beam.vendor.grpc.v1p26p0.com.google.common.base.Preconditions; +import org.apache.beam.vendor.grpc.v1p54p0.com.google.common.base.Preconditions; +import org.checkerframework.checker.nullness.qual.Nullable; import redis.clients.jedis.Jedis; -import redis.clients.jedis.Pipeline; +import redis.clients.jedis.Transaction; -public class RedisHashIO { +public class WritingInMemoryStore { - public static RedisHashIO.Write write() { + public static WritingInMemoryStore.Write write() { - return (new AutoValue_RedisHashIO_Write.Builder()) + return (new AutoValue_WritingInMemoryStore_Write.Builder()) .setConnectionConfiguration(RedisConnectionConfiguration.create()).build(); } @@ -34,30 +34,30 @@ public Write() { @Nullable abstract Long expireTime(); - abstract RedisHashIO.Write.Builder toBuilder(); + abstract WritingInMemoryStore.Write.Builder toBuilder(); - public RedisHashIO.Write withEndpoint(String host, int port) { + public WritingInMemoryStore.Write withEndpoint(String host, int port) { Preconditions.checkArgument(host != null, "host cannot be null"); Preconditions.checkArgument(port > 0, "port cannot be negative or 0"); return this.toBuilder().setConnectionConfiguration(this.connectionConfiguration().withHost(host).withPort(port)).build(); } - public RedisHashIO.Write withAuth(String auth) { + public WritingInMemoryStore.Write withAuth(String auth) { Preconditions.checkArgument(auth != null, "auth cannot be null"); return this.toBuilder().setConnectionConfiguration(this.connectionConfiguration().withAuth(auth)).build(); } - public RedisHashIO.Write withTimeout(int timeout) { + public WritingInMemoryStore.Write withTimeout(int timeout) { Preconditions.checkArgument(timeout >= 0, "timeout cannot be negative"); return this.toBuilder().setConnectionConfiguration(this.connectionConfiguration().withTimeout(timeout)).build(); } - public RedisHashIO.Write withConnectionConfiguration(RedisConnectionConfiguration connectionConfiguration) { + public WritingInMemoryStore.Write withConnectionConfiguration(RedisConnectionConfiguration connectionConfiguration) { Preconditions.checkArgument(connectionConfiguration != null, "connection cannot be null"); return this.toBuilder().setConnectionConfiguration(connectionConfiguration).build(); } - public RedisHashIO.Write withExpireTime(Long expireTimeMillis) { + public WritingInMemoryStore.Write withExpireTime(Long expireTimeMillis) { Preconditions.checkArgument(expireTimeMillis != null, "expireTimeMillis cannot be null"); Preconditions.checkArgument(expireTimeMillis > 0L, "expireTimeMillis cannot be negative or 0"); return this.toBuilder().setExpireTime(expireTimeMillis).build(); @@ -65,18 +65,18 @@ public RedisHashIO.Write withExpireTime(Long expireTimeMillis) { public PDone expand(PCollection>> input) { Preconditions.checkArgument(this.connectionConfiguration() != null, "withConnectionConfiguration() is required"); - input.apply(ParDo.of(new RedisHashIO.Write.WriteFn(this))); + input.apply(ParDo.of(new WritingInMemoryStore.Write.WriteFn(this))); return PDone.in(input.getPipeline()); } private static class WriteFn extends DoFn>, Void> { private static final int DEFAULT_BATCH_SIZE = 1000; - private final RedisHashIO.Write spec; + private final WritingInMemoryStore.Write spec; private transient Jedis jedis; - private transient Pipeline pipeline; + private transient @Nullable Transaction transaction; private int batchCount; - public WriteFn(RedisHashIO.Write spec) { + public WriteFn(WritingInMemoryStore.Write spec) { this.spec = spec; } @@ -87,9 +87,8 @@ public void setup() { @StartBundle public void startBundle() { - this.pipeline = this.jedis.pipelined(); - this.pipeline.multi(); - this.batchCount = 0; + transaction = jedis.multi(); + batchCount = 0; } @ProcessElement @@ -101,9 +100,8 @@ public void processElement(DoFn>, Void>.ProcessCon batchCount++; if (batchCount >= DEFAULT_BATCH_SIZE) { - pipeline.exec(); - pipeline.sync(); - pipeline.multi(); + transaction.exec(); + transaction.multi(); batchCount = 0; } } @@ -114,17 +112,20 @@ private void writeRecord(KV> record) { String fieldKey = hashValue.getKey(); String value = hashValue.getValue(); - pipeline.hset(hashKey, fieldKey, value); + transaction.sadd(hashKey, fieldKey, value); } @FinishBundle public void finishBundle() { - if (this.pipeline.isInMulti()) { - this.pipeline.exec(); - this.pipeline.sync(); + if (batchCount > 0) { + transaction.exec(); + } + if (transaction != null) { + transaction.close(); } - this.batchCount = 0; + transaction = null; + batchCount = 0; } @Teardown @@ -140,11 +141,11 @@ abstract static class Builder { Builder() { } - abstract RedisHashIO.Write.Builder setExpireTime(Long expireTimeMillis); + abstract WritingInMemoryStore.Write.Builder setExpireTime(Long expireTimeMillis); - abstract RedisHashIO.Write build(); + abstract WritingInMemoryStore.Write build(); - abstract RedisHashIO.Write.Builder setConnectionConfiguration(RedisConnectionConfiguration connectionConfiguration); + abstract WritingInMemoryStore.Write.Builder setConnectionConfiguration(RedisConnectionConfiguration connectionConfiguration); } diff --git a/src/test/java/com/viveknaskar/functions/RedisHashIOTest.java b/src/test/java/com/viveknaskar/functions/WritingInMemoryStoreTest.java similarity index 76% rename from src/test/java/com/viveknaskar/functions/RedisHashIOTest.java rename to src/test/java/com/viveknaskar/functions/WritingInMemoryStoreTest.java index 1509e92..116580c 100644 --- a/src/test/java/com/viveknaskar/functions/RedisHashIOTest.java +++ b/src/test/java/com/viveknaskar/functions/WritingInMemoryStoreTest.java @@ -11,8 +11,10 @@ import org.mockito.junit.MockitoJUnitRunner; import redis.clients.jedis.Jedis; +import java.util.Set; + @RunWith(MockitoJUnitRunner.class) -public class RedisHashIOTest { +public class WritingInMemoryStoreTest { private static final String REDIS_HOST = "localhost"; @@ -39,21 +41,21 @@ public static void afterClass() { } @Test - public void TestWriteHashWithConfig() { + public void testWriteHashWithConfig() { KV fieldValue = KV.of("hash12", "p11"); - KV> record = KV.of("hash11:bbbbbb", fieldValue ); + KV> record = KV.of("hash11:bbbbbb", fieldValue); PCollection>> write = pipeline.apply(Create.of(record)); - write.apply("Writing Hash into Redis", RedisHashIO.write() + write.apply("Writing Hash into Redis", WritingInMemoryStore.write() .withConnectionConfiguration(RedisConnectionConfiguration - .create(REDIS_HOST, port))); + .create(REDIS_HOST, port))); pipeline.run(); - String ppid = client.hget("hash11:bbbbbb", "hash12"); - Assert.assertEquals(ppid, "p11"); - + Set members = client.smembers("hash11:bbbbbb"); + boolean isMember = members.contains("hash12"); + Assert.assertTrue("The record should be a member in the set", isMember); } } \ No newline at end of file From ec8ce3c2b3b9b780d06333c40dbad782ea2a0722 Mon Sep 17 00:00:00 2001 From: Vivek Naskar Date: Mon, 8 Jan 2024 10:34:34 +0530 Subject: [PATCH 2/3] refactored the code --- src/test/java/com/viveknaskar/functions/ProcessingPPIDTest.java | 2 +- .../java/com/viveknaskar/functions/ProcessingRecordsTest.java | 2 +- .../java/com/viveknaskar/functions/TransformingDataTest.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/test/java/com/viveknaskar/functions/ProcessingPPIDTest.java b/src/test/java/com/viveknaskar/functions/ProcessingPPIDTest.java index ff93401..17b4905 100644 --- a/src/test/java/com/viveknaskar/functions/ProcessingPPIDTest.java +++ b/src/test/java/com/viveknaskar/functions/ProcessingPPIDTest.java @@ -20,7 +20,7 @@ public class ProcessingPPIDTest { @Rule public final transient TestPipeline pipeline = TestPipeline.create(); - private static StorageToRedisOptions options = PipelineOptionsFactory.create() + private static final StorageToRedisOptions options = PipelineOptionsFactory.create() .as(StorageToRedisOptions.class); private static final String[] INPUT_DATA = new String[] { diff --git a/src/test/java/com/viveknaskar/functions/ProcessingRecordsTest.java b/src/test/java/com/viveknaskar/functions/ProcessingRecordsTest.java index a0de52e..6bd4d47 100644 --- a/src/test/java/com/viveknaskar/functions/ProcessingRecordsTest.java +++ b/src/test/java/com/viveknaskar/functions/ProcessingRecordsTest.java @@ -20,7 +20,7 @@ public class ProcessingRecordsTest { @Rule public final transient TestPipeline pipeline = TestPipeline.create(); - private static StorageToRedisOptions options = PipelineOptionsFactory.create() + private static final StorageToRedisOptions options = PipelineOptionsFactory.create() .as(StorageToRedisOptions.class); private static final String[] INPUT_DATA = new String[] { diff --git a/src/test/java/com/viveknaskar/functions/TransformingDataTest.java b/src/test/java/com/viveknaskar/functions/TransformingDataTest.java index 2bcfa1c..4c793b9 100644 --- a/src/test/java/com/viveknaskar/functions/TransformingDataTest.java +++ b/src/test/java/com/viveknaskar/functions/TransformingDataTest.java @@ -19,7 +19,7 @@ public class TransformingDataTest { @Rule public final transient TestPipeline pipeline = TestPipeline.create(); - private static StorageToRedisOptions options = PipelineOptionsFactory.create() + private static final StorageToRedisOptions options = PipelineOptionsFactory.create() .as(StorageToRedisOptions.class); private static final String INPUT_DATA = From 4fac2d4c7b09cf3d8735212971676b227ef58d6e Mon Sep 17 00:00:00 2001 From: Vivek Naskar Date: Tue, 21 May 2024 21:45:38 +0530 Subject: [PATCH 3/3] Upgraded the dependencies to latest to make it compatible with Java 17 --- pom.xml | 26 ++++++++++++------- .../functions/FlushingMemoryStore.java | 2 +- .../functions/WritingInMemoryStore.java | 7 ++--- .../functions/WritingInMemoryStoreTest.java | 3 ++- 4 files changed, 21 insertions(+), 17 deletions(-) diff --git a/pom.xml b/pom.xml index 0af47ae..b8555fd 100644 --- a/pom.xml +++ b/pom.xml @@ -11,10 +11,16 @@ UTF-8 - 3.11.0 + 3.12.1 3.1.0 - 2.0.7 - 2.52.0 + 2.0.12 + 2.56.0 + 1.1.1 + 1.10.4 + 5.2.0 + 4.13.2 + 1.3 + 2.0.13 @@ -99,34 +105,34 @@ com.github.fppt jedis-mock - 0.1.16 + ${jedis-mock.version} com.google.auto.value auto-value - 1.7.4 + ${auto-value.version} provided org.mockito - mockito-core - 2.18.0 + mockito-inline + ${mockito.version} test junit junit - 4.13 + ${junit.version} test org.hamcrest hamcrest-all - 1.3 + ${hamcrest.version} test @@ -139,7 +145,7 @@ org.slf4j slf4j-jdk14 - ${slf4j.version} + ${slf4j-jdk14.version} diff --git a/src/main/java/com/viveknaskar/functions/FlushingMemoryStore.java b/src/main/java/com/viveknaskar/functions/FlushingMemoryStore.java index d9f9915..0d46584 100644 --- a/src/main/java/com/viveknaskar/functions/FlushingMemoryStore.java +++ b/src/main/java/com/viveknaskar/functions/FlushingMemoryStore.java @@ -1,12 +1,12 @@ package com.viveknaskar.functions; import com.google.auto.value.AutoValue; +import com.google.common.base.Preconditions; import org.apache.beam.sdk.io.redis.RedisConnectionConfiguration; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.vendor.grpc.v1p54p0.com.google.common.base.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import redis.clients.jedis.Jedis; diff --git a/src/main/java/com/viveknaskar/functions/WritingInMemoryStore.java b/src/main/java/com/viveknaskar/functions/WritingInMemoryStore.java index 4a45722..2935d5f 100644 --- a/src/main/java/com/viveknaskar/functions/WritingInMemoryStore.java +++ b/src/main/java/com/viveknaskar/functions/WritingInMemoryStore.java @@ -1,6 +1,7 @@ package com.viveknaskar.functions; import com.google.auto.value.AutoValue; +import com.google.common.base.Preconditions; import org.apache.beam.sdk.io.redis.RedisConnectionConfiguration; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; @@ -8,7 +9,6 @@ import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; -import org.apache.beam.vendor.grpc.v1p54p0.com.google.common.base.Preconditions; import org.checkerframework.checker.nullness.qual.Nullable; import redis.clients.jedis.Jedis; import redis.clients.jedis.Transaction; @@ -16,17 +16,14 @@ public class WritingInMemoryStore { public static WritingInMemoryStore.Write write() { - return (new AutoValue_WritingInMemoryStore_Write.Builder()) .setConnectionConfiguration(RedisConnectionConfiguration.create()).build(); - } @AutoValue public abstract static class Write extends PTransform>>, PDone> { - public Write() { - } + public Write() {} @Nullable abstract RedisConnectionConfiguration connectionConfiguration(); diff --git a/src/test/java/com/viveknaskar/functions/WritingInMemoryStoreTest.java b/src/test/java/com/viveknaskar/functions/WritingInMemoryStoreTest.java index 116580c..69014e3 100644 --- a/src/test/java/com/viveknaskar/functions/WritingInMemoryStoreTest.java +++ b/src/test/java/com/viveknaskar/functions/WritingInMemoryStoreTest.java @@ -11,6 +11,7 @@ import org.mockito.junit.MockitoJUnitRunner; import redis.clients.jedis.Jedis; +import java.io.IOException; import java.util.Set; @RunWith(MockitoJUnitRunner.class) @@ -35,7 +36,7 @@ public static void beforeClass() throws Exception { } @AfterClass - public static void afterClass() { + public static void afterClass() throws IOException { client.close(); server.stop(); }