Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 20 additions & 30 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,20 @@

<groupId>com.viveknaskar</groupId>
<artifactId>dataflow-redis-example</artifactId>
<version>1.0.1</version>
<version>1.0.2</version>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven-compiler-plugin.version>3.7.0</maven-compiler-plugin.version>
<exec-maven-plugin.version>1.6.0</exec-maven-plugin.version>
<slf4j.version>1.7.25</slf4j.version>
<beam.version>2.26.0</beam.version>
<maven-compiler-plugin.version>3.12.1</maven-compiler-plugin.version>
<exec-maven-plugin.version>3.1.0</exec-maven-plugin.version>
<slf4j.version>2.0.12</slf4j.version>
<beam.version>2.56.0</beam.version>
<jedis-mock.version>1.1.1</jedis-mock.version>
<auto-value.version>1.10.4</auto-value.version>
<mockito.version>5.2.0</mockito.version>
<junit.version>4.13.2</junit.version>
<hamcrest.version>1.3</hamcrest.version>
<slf4j-jdk14.version>2.0.13</slf4j-jdk14.version>
</properties>

<repositories>
Expand All @@ -38,26 +44,10 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>${maven-compiler-plugin.version}</version>
<configuration>
<source>11</source>
<target>11</target>
<source>17</source>
<target>17</target>
</configuration>
</plugin>

<!-- The Sonarqube must be started. Default host is localhost:9000 -->
<!--<plugin>
<groupId>org.sonarsource.scanner.maven</groupId>
<artifactId>sonar-maven-plugin</artifactId>
<version>3.6.0.1398</version>
<executions>
<execution>
<phase>verify</phase>
<goals>
<goal>sonar</goal>
</goals>
</execution>
</executions>
</plugin>-->

</plugins>

<pluginManagement>
Expand Down Expand Up @@ -115,34 +105,34 @@
<dependency>
<groupId>com.github.fppt</groupId>
<artifactId>jedis-mock</artifactId>
<version>0.1.16</version>
<version>${jedis-mock.version}</version>
</dependency>

<dependency>
<groupId>com.google.auto.value</groupId>
<artifactId>auto-value</artifactId>
<version>1.7.4</version>
<version>${auto-value.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>2.18.0</version>
<artifactId>mockito-inline</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13</version>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
<version>1.3</version>
<version>${hamcrest.version}</version>
<scope>test</scope>
</dependency>

Expand All @@ -155,7 +145,7 @@
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-jdk14</artifactId>
<version>${slf4j.version}</version>
<version>${slf4j-jdk14.version}</version>
</dependency>
</dependencies>

Expand Down
4 changes: 2 additions & 2 deletions src/main/java/com/viveknaskar/StarterPipeline.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public static void main(String[] args) {
* Flushing the Memorystore if there are records in the input file
*/
PCollection<String> flushFlag = lines.apply("Checking Data in input file", Count.globally())
.apply("Flushing the data store", FlushingMemorystore.read()
.apply("Flushing the data store", FlushingMemoryStore.read()
.withConnectionConfiguration(RedisConnectionConfiguration
.create(options.getRedisHost(), options.getRedisPort())));

Expand Down Expand Up @@ -110,7 +110,7 @@ public static void main(String[] args) {

ppidDataSet.apply(Wait.on(flushFlag))
.apply("Creating PPID index",
RedisHashIO.write().withConnectionConfiguration(RedisConnectionConfiguration
WritingInMemoryStore.write().withConnectionConfiguration(RedisConnectionConfiguration
.create(options.getRedisHost(), options.getRedisPort())));

p.run();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,26 @@
package com.viveknaskar.functions;

import org.checkerframework.checker.nullness.qual.Nullable;
import com.google.auto.value.AutoValue;
import com.google.common.base.Preconditions;
import org.apache.beam.sdk.io.redis.RedisConnectionConfiguration;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.grpc.v1p26p0.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;
import redis.clients.jedis.Transaction;

public class FlushingMemorystore extends DoFn<Long, String> {
import javax.annotation.Nullable;
import java.util.Objects;

private static final Logger LOGGER = LoggerFactory.getLogger(FlushingMemorystore.class);
public class FlushingMemoryStore extends DoFn<Long, String> {

public static FlushingMemorystore.Read read() {
return (new AutoValue_FlushingMemorystore_Read.Builder())
private static final Logger LOGGER = LoggerFactory.getLogger(FlushingMemoryStore.class);

public static FlushingMemoryStore.Read read() {
return (new AutoValue_FlushingMemoryStore_Read.Builder())
.setConnectionConfiguration(RedisConnectionConfiguration.create()).build();
}

Expand All @@ -33,79 +35,74 @@ public Read() {

@Nullable
abstract Long expireTime();
abstract FlushingMemorystore.Read.Builder toBuilder();
abstract FlushingMemoryStore.Read.Builder toBuilder();

public FlushingMemorystore.Read withEndpoint(String host, int port) {
public FlushingMemoryStore.Read withEndpoint(String host, int port) {
Preconditions.checkArgument(host != null, "host cannot be null");
Preconditions.checkArgument(port > 0, "port cannot be negative or 0");
return this.toBuilder().setConnectionConfiguration(this.connectionConfiguration().withHost(host).withPort(port)).build();
}

public FlushingMemorystore.Read withAuth(String auth) {
public FlushingMemoryStore.Read withAuth(String auth) {
Preconditions.checkArgument(auth != null, "auth cannot be null");
return this.toBuilder().setConnectionConfiguration(this.connectionConfiguration().withAuth(auth)).build();
}

public FlushingMemorystore.Read withTimeout(int timeout) {
public FlushingMemoryStore.Read withTimeout(int timeout) {
Preconditions.checkArgument(timeout >= 0, "timeout cannot be negative");
return this.toBuilder().setConnectionConfiguration(this.connectionConfiguration().withTimeout(timeout)).build();
}

public FlushingMemorystore.Read withConnectionConfiguration(RedisConnectionConfiguration connectionConfiguration) {
public FlushingMemoryStore.Read withConnectionConfiguration(RedisConnectionConfiguration connectionConfiguration) {
Preconditions.checkArgument(connectionConfiguration != null, "connection cannot be null");
return this.toBuilder().setConnectionConfiguration(connectionConfiguration).build();
}

public FlushingMemorystore.Read withExpireTime(Long expireTimeMillis) {
public FlushingMemoryStore.Read withExpireTime(Long expireTimeMillis) {
Preconditions.checkArgument(expireTimeMillis != null, "expireTimeMillis cannot be null");
Preconditions.checkArgument(expireTimeMillis > 0L, "expireTimeMillis cannot be negative or 0");
return this.toBuilder().setExpireTime(expireTimeMillis).build();
}

public PCollection<String> expand(PCollection<Long> input) {
Preconditions.checkArgument(this.connectionConfiguration() != null, "withConnectionConfiguration() is required");
return input.apply(ParDo.of(new FlushingMemorystore.Read.ReadFn(this)));
return input.apply(ParDo.of(new FlushingMemoryStore.Read.ReadFn(this)));
}

@Setup
public Jedis setup() {
return this.connectionConfiguration().connect();
return Objects.requireNonNull(this.connectionConfiguration()).connect();
}

private static class ReadFn extends DoFn<Long, String> {
private static final int DEFAULT_BATCH_SIZE = 1000;
private final FlushingMemorystore.Read spec;
private final FlushingMemoryStore.Read spec;
private transient Jedis jedis;
private transient Pipeline pipeline;
private transient @Nullable Transaction transaction;
private int batchCount;

public ReadFn(FlushingMemorystore.Read spec) {
public ReadFn(FlushingMemoryStore.Read spec) {
this.spec = spec;
}

@Setup
public void setup() {
this.jedis = this.spec.connectionConfiguration().connect();
this.jedis = Objects.requireNonNull(this.spec.connectionConfiguration()).connect();
}

@StartBundle
public void startBundle() {
this.pipeline = this.jedis.pipelined();
this.pipeline.multi();
this.batchCount = 0;
transaction = jedis.multi();
batchCount = 0;
}

@ProcessElement
public void processElement(@Element Long count, OutputReceiver<String> out) {
batchCount++;

if(count!=null && count > 0) {
if (pipeline.isInMulti()) {
pipeline.exec();
pipeline.sync();
jedis.flushDB();
LOGGER.info("*****The memorystore is flushed*****");
}
jedis.flushDB();
LOGGER.info("*****The Memorystore is flushed*****");
out.output("SUCCESS");
} else {
LOGGER.info("No Records are there in the input file");
Expand All @@ -116,11 +113,14 @@ public void processElement(@Element Long count, OutputReceiver<String> out) {

@FinishBundle
public void finishBundle() {
if (this.pipeline.isInMulti()) {
this.pipeline.exec();
this.pipeline.sync();
if (batchCount > 0) {
transaction.exec();
}
if (transaction != null) {
transaction.close();
}
this.batchCount=0;
transaction = null;
batchCount = 0;
}

@Teardown
Expand All @@ -136,11 +136,11 @@ abstract static class Builder {
Builder() {
}

abstract FlushingMemorystore.Read.Builder setExpireTime(Long expireTimeMillis);
abstract FlushingMemoryStore.Read.Builder setExpireTime(Long expireTimeMillis);

abstract FlushingMemorystore.Read build();
abstract FlushingMemoryStore.Read build();

abstract FlushingMemorystore.Read.Builder setConnectionConfiguration(RedisConnectionConfiguration connectionConfiguration);
abstract FlushingMemoryStore.Read.Builder setConnectionConfiguration(RedisConnectionConfiguration connectionConfiguration);

}

Expand Down
Loading