Skip to content

Commit 7c871e5

Browse files
committed
Add blocking queue item reader and writer
Resolves #2350 Resolves #2044
1 parent a293f3e commit 7c871e5

File tree

8 files changed

+444
-0
lines changed

8 files changed

+444
-0
lines changed
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Copyright 2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.batch.item.queue;
17+
18+
import org.springframework.batch.item.ItemReader;
19+
20+
import java.util.concurrent.BlockingQueue;
21+
import java.util.concurrent.TimeUnit;
22+
23+
/**
24+
* This is an {@link ItemReader} that reads items from a {@link BlockingQueue}. It stops
25+
* reading (ie returns {@code null}) if no items are available in the queue after a
26+
* configurable timeout.
27+
*
28+
* @param <T> type of items to read.
29+
* @author Mahmoud Ben Hassine
30+
* @since 5.2.0
31+
*/
32+
public class BlockingQueueItemReader<T> implements ItemReader<T> {
33+
34+
private final BlockingQueue<T> queue;
35+
36+
private long timeout = 1L;
37+
38+
private TimeUnit timeUnit = TimeUnit.SECONDS;
39+
40+
/**
41+
* Create a new {@link BlockingQueueItemReader}.
42+
* @param queue the queue to read items from
43+
*/
44+
public BlockingQueueItemReader(BlockingQueue<T> queue) {
45+
this.queue = queue;
46+
}
47+
48+
/**
49+
* Set the reading timeout and time unit. Defaults to 1 second.
50+
* @param timeout the timeout after which the reader stops reading
51+
* @param timeUnit the unit of the timeout
52+
*/
53+
public void setTimeout(long timeout, TimeUnit timeUnit) {
54+
this.timeout = timeout;
55+
this.timeUnit = timeUnit;
56+
}
57+
58+
@Override
59+
public T read() throws Exception {
60+
return this.queue.poll(this.timeout, this.timeUnit);
61+
}
62+
63+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright 2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.batch.item.queue;
17+
18+
import org.springframework.batch.item.Chunk;
19+
import org.springframework.batch.item.ItemWriter;
20+
21+
import java.util.concurrent.BlockingQueue;
22+
23+
/**
24+
* This is an {@link ItemWriter} that writes items to a {@link BlockingQueue}.
25+
*
26+
* @param <T> type of items to write
27+
* @since 5.2.0
28+
* @author Mahmoud Ben Hassine
29+
*/
30+
public class BlockingQueueItemWriter<T> implements ItemWriter<T> {
31+
32+
private final BlockingQueue<T> queue;
33+
34+
/**
35+
* Create a new {@link BlockingQueueItemWriter}.
36+
* @param queue the queue to write items to
37+
*/
38+
public BlockingQueueItemWriter(BlockingQueue<T> queue) {
39+
this.queue = queue;
40+
}
41+
42+
@Override
43+
public void write(Chunk<? extends T> items) throws Exception {
44+
for (T item : items) {
45+
this.queue.put(item);
46+
}
47+
}
48+
49+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Copyright 2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.batch.item.queue.builder;
17+
18+
import java.util.concurrent.BlockingQueue;
19+
import java.util.concurrent.TimeUnit;
20+
21+
import org.springframework.batch.item.queue.BlockingQueueItemReader;
22+
import org.springframework.util.Assert;
23+
24+
/**
25+
* Builder for {@link BlockingQueueItemReader}.
26+
*
27+
* @param <T> type of items to read
28+
* @since 5.2.0
29+
* @author Mahmoud Ben Hassine
30+
*/
31+
public class BlockingQueueItemReaderBuilder<T> {
32+
33+
private BlockingQueue<T> queue;
34+
35+
private long timeout = 1L;
36+
37+
private TimeUnit timeUnit = TimeUnit.SECONDS;
38+
39+
/**
40+
* Set the queue to read items from.
41+
* @param queue the queue to read items from.
42+
* @return this instance of the builder
43+
*/
44+
public BlockingQueueItemReaderBuilder<T> queue(BlockingQueue<T> queue) {
45+
this.queue = queue;
46+
return this;
47+
}
48+
49+
/**
50+
* Set the reading timeout. Defaults to 1 second.
51+
* @param timeout the reading timeout.
52+
* @return this instance of the builder
53+
*/
54+
public BlockingQueueItemReaderBuilder<T> timeout(long timeout, TimeUnit timeUnit) {
55+
this.timeout = timeout;
56+
this.timeUnit = timeUnit;
57+
return this;
58+
}
59+
60+
/**
61+
* Create a configured {@link BlockingQueueItemReader}.
62+
* @return a configured {@link BlockingQueueItemReader}.
63+
*/
64+
public BlockingQueueItemReader<T> build() {
65+
Assert.state(this.queue != null, "The blocking queue is required.");
66+
BlockingQueueItemReader<T> blockingQueueItemReader = new BlockingQueueItemReader<>(this.queue);
67+
blockingQueueItemReader.setTimeout(this.timeout, this.timeUnit);
68+
return blockingQueueItemReader;
69+
}
70+
71+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Copyright 2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.batch.item.queue.builder;
17+
18+
import java.util.concurrent.BlockingQueue;
19+
20+
import org.springframework.batch.item.queue.BlockingQueueItemWriter;
21+
import org.springframework.util.Assert;
22+
23+
/**
24+
* Builder for a {@link BlockingQueueItemWriter}.
25+
*
26+
* @param <T> type of items to write
27+
* @since 5.2.0
28+
* @author Mahmoud Ben Hassine
29+
*/
30+
public class BlockingQueueItemWriterBuilder<T> {
31+
32+
private BlockingQueue<T> queue;
33+
34+
/**
35+
* Create a new {@link BlockingQueueItemWriterBuilder}
36+
* @param queue the queue to write items to
37+
* @return this instance of the builder
38+
*/
39+
public BlockingQueueItemWriterBuilder<T> queue(BlockingQueue<T> queue) {
40+
this.queue = queue;
41+
return this;
42+
}
43+
44+
/**
45+
* Create a configured {@link BlockingQueueItemWriter}.
46+
* @return a configured {@link BlockingQueueItemWriter}.
47+
*/
48+
public BlockingQueueItemWriter<T> build() {
49+
Assert.state(this.queue != null, "The blocking queue is required.");
50+
return new BlockingQueueItemWriter<>(this.queue);
51+
}
52+
53+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright 2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.batch.item.queue;
17+
18+
import java.util.concurrent.ArrayBlockingQueue;
19+
import java.util.concurrent.BlockingQueue;
20+
import java.util.concurrent.TimeUnit;
21+
22+
import org.junit.jupiter.api.Assertions;
23+
import org.junit.jupiter.api.Test;
24+
25+
import org.springframework.batch.item.queue.builder.BlockingQueueItemReaderBuilder;
26+
27+
/**
28+
* Test class for {@link BlockingQueueItemReader}.
29+
*
30+
* @author Mahmoud Ben Hassine
31+
*/
32+
class BlockingQueueItemReaderTests {
33+
34+
@Test
35+
void testRead() throws Exception {
36+
// given
37+
BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);
38+
queue.put("foo");
39+
BlockingQueueItemReader<String> reader = new BlockingQueueItemReaderBuilder<String>().queue(queue)
40+
.timeout(10, TimeUnit.MILLISECONDS)
41+
.build();
42+
43+
// when & then
44+
Assertions.assertEquals("foo", reader.read());
45+
Assertions.assertNull(reader.read());
46+
}
47+
48+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright 2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.batch.item.queue;
17+
18+
import java.util.List;
19+
import java.util.concurrent.ArrayBlockingQueue;
20+
import java.util.concurrent.BlockingQueue;
21+
22+
import org.junit.jupiter.api.Test;
23+
24+
import org.springframework.batch.item.Chunk;
25+
import org.springframework.batch.item.queue.builder.BlockingQueueItemWriterBuilder;
26+
27+
import static org.junit.jupiter.api.Assertions.assertTrue;
28+
29+
/**
30+
* Test class for {@link BlockingQueueItemWriter}.
31+
*
32+
* @author Mahmoud Ben Hassine
33+
*/
34+
class BlockingQueueItemWriterTests {
35+
36+
@Test
37+
void testWrite() throws Exception {
38+
// given
39+
BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);
40+
BlockingQueueItemWriter<String> writer = new BlockingQueueItemWriterBuilder<String>().queue(queue).build();
41+
42+
// when
43+
writer.write(Chunk.of("foo", "bar"));
44+
45+
// then
46+
assertTrue(queue.containsAll(List.of("foo", "bar")));
47+
}
48+
49+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Copyright 2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.batch.item.queue.builder;
17+
18+
import java.util.concurrent.ArrayBlockingQueue;
19+
import java.util.concurrent.BlockingQueue;
20+
21+
import org.junit.jupiter.api.Test;
22+
23+
import org.springframework.batch.item.queue.BlockingQueueItemReader;
24+
import org.springframework.test.util.ReflectionTestUtils;
25+
26+
import static org.junit.jupiter.api.Assertions.assertEquals;
27+
import static org.junit.jupiter.api.Assertions.assertNotNull;
28+
import static org.junit.jupiter.api.Assertions.assertThrows;
29+
30+
/**
31+
* Test class for {@link BlockingQueueItemReaderBuilder}.
32+
*
33+
* @author Mahmoud Ben Hassine
34+
*/
35+
class BlockingQueueItemReaderBuilderTests {
36+
37+
@Test
38+
void testMandatoryQueue() {
39+
assertThrows(IllegalStateException.class, () -> new BlockingQueueItemReaderBuilder<String>().build());
40+
}
41+
42+
@Test
43+
void testBuildReader() {
44+
// given
45+
BlockingQueue<String> queue = new ArrayBlockingQueue<>(5);
46+
47+
// when
48+
BlockingQueueItemReader<String> reader = new BlockingQueueItemReaderBuilder<String>().queue(queue).build();
49+
50+
// then
51+
assertNotNull(reader);
52+
assertEquals(queue, ReflectionTestUtils.getField(reader, "queue"));
53+
}
54+
55+
}

0 commit comments

Comments
 (0)