Skip to content

Commit 69665d8

Browse files
committed
Fix chunk tracker lifecycle in chunk-oriented step
Resolves #5126
1 parent b58c842 commit 69665d8

File tree

2 files changed

+58
-3
lines changed

2 files changed

+58
-3
lines changed

spring-batch-core/src/main/java/org/springframework/batch/core/step/item/ChunkOrientedStep.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -349,10 +349,12 @@ public void afterPropertiesSet() throws Exception {
349349
@Override
350350
protected void open(ExecutionContext executionContext) throws Exception {
351351
this.compositeItemStream.open(executionContext);
352+
this.chunkTracker.get().init();
352353
}
353354

354355
@Override
355356
protected void close(ExecutionContext executionContext) throws Exception {
357+
this.chunkTracker.get().reset();
356358
this.compositeItemStream.close();
357359
}
358360

@@ -505,7 +507,7 @@ private Chunk<I> readChunk(StepContribution contribution) throws Exception {
505507
this.compositeItemReadListener.beforeRead();
506508
item = doRead();
507509
if (item == null) {
508-
this.chunkTracker.get().noMoreItems();
510+
this.chunkTracker.get().reset();
509511
}
510512
else {
511513
contribution.incrementReadCount();
@@ -757,9 +759,13 @@ private boolean isConcurrent() {
757759

758760
private static class ChunkTracker {
759761

760-
private boolean moreItems = true;
762+
private boolean moreItems;
761763

762-
void noMoreItems() {
764+
void init() {
765+
this.moreItems = true;
766+
}
767+
768+
void reset() {
763769
this.moreItems = false;
764770
}
765771

spring-batch-core/src/test/java/org/springframework/batch/core/step/item/ChunkOrientedStepIntegrationTests.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,22 +19,28 @@
1919
import org.junit.jupiter.api.Test;
2020

2121
import org.springframework.batch.core.ExitStatus;
22+
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
23+
import org.springframework.batch.core.configuration.annotation.EnableJdbcJobRepository;
2224
import org.springframework.batch.core.job.Job;
2325
import org.springframework.batch.core.job.JobExecution;
26+
import org.springframework.batch.core.job.builder.JobBuilder;
2427
import org.springframework.batch.core.job.parameters.JobParameters;
2528
import org.springframework.batch.core.job.parameters.JobParametersBuilder;
2629
import org.springframework.batch.core.launch.JobOperator;
2730
import org.springframework.batch.core.repository.JobRepository;
2831
import org.springframework.batch.core.step.Step;
2932
import org.springframework.batch.core.step.StepExecution;
3033
import org.springframework.batch.core.step.builder.ChunkOrientedStepBuilder;
34+
import org.springframework.batch.core.step.builder.StepBuilder;
3135
import org.springframework.batch.infrastructure.item.ItemProcessor;
3236
import org.springframework.batch.infrastructure.item.ItemReader;
3337
import org.springframework.batch.infrastructure.item.ItemWriter;
38+
import org.springframework.batch.infrastructure.item.support.ListItemWriter;
3439
import org.springframework.context.ApplicationContext;
3540
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
3641
import org.springframework.context.annotation.Bean;
3742
import org.springframework.context.annotation.Configuration;
43+
import org.springframework.context.annotation.Import;
3844
import org.springframework.core.task.SimpleAsyncTaskExecutor;
3945
import org.springframework.jdbc.core.JdbcTemplate;
4046
import org.springframework.jdbc.support.JdbcTransactionManager;
@@ -160,6 +166,23 @@ void testConcurrentChunkOrientedStepFailure() throws Exception {
160166
System.clearProperty("fail");
161167
}
162168

169+
// Issue: https://github.com/spring-projects/spring-batch/issues/5126
170+
@Test
171+
void testChunkOrientedStepReExecution() throws Exception {
172+
// given
173+
ApplicationContext context = new AnnotationConfigApplicationContext(StepConfiguration.class);
174+
JobOperator jobOperator = context.getBean(JobOperator.class);
175+
Job job = context.getBean(Job.class);
176+
177+
// when
178+
jobOperator.start(job, new JobParametersBuilder().addLong("run.id", 1L).toJobParameters());
179+
jobOperator.start(job, new JobParametersBuilder().addLong("run.id", 2L).toJobParameters());
180+
181+
// then
182+
ListItemWriter<String> itemWriter = context.getBean(ListItemWriter.class);
183+
Assertions.assertEquals(2, itemWriter.getWrittenItems().size());
184+
}
185+
163186
@Configuration
164187
static class ChunkOrientedStepConfiguration {
165188

@@ -193,4 +216,30 @@ public Step concurrentChunkOrientedStep(JobRepository jobRepository, JdbcTransac
193216

194217
}
195218

219+
@Configuration
220+
@EnableBatchProcessing
221+
@EnableJdbcJobRepository
222+
@Import(JdbcInfrastructureConfiguration.class)
223+
static class StepConfiguration {
224+
225+
// singleton-scoped item writer acting as a global collector
226+
// of written items across job executions
227+
@Bean
228+
public ListItemWriter<String> itemWriter() {
229+
return new ListItemWriter<>();
230+
}
231+
232+
@Bean
233+
Job job(JobRepository jobRepository, JdbcTransactionManager transactionManager,
234+
ListItemWriter<String> itemWriter) {
235+
ChunkOrientedStep<String, String> step = new StepBuilder("step", jobRepository).<String, String>chunk(1)
236+
.transactionManager(transactionManager)
237+
.reader(new SingleItemStreamReader<>("foo"))
238+
.writer(itemWriter)
239+
.build();
240+
return new JobBuilder(jobRepository).start(step).build();
241+
}
242+
243+
}
244+
196245
}

0 commit comments

Comments
 (0)