Skip to content

Commit 736aa94

Browse files
committed
Add Micrometer observability to ChunkOrientedStep
1 parent a7f8c84 commit 736aa94

File tree

3 files changed

+189
-2
lines changed

3 files changed

+189
-2
lines changed

spring-batch-core/src/main/java/org/springframework/batch/core/step/builder/ChunkOrientedStepBuilder.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import java.util.LinkedHashSet;
2222
import java.util.Set;
2323

24+
import io.micrometer.core.instrument.MeterRegistry;
25+
2426
import org.springframework.batch.core.annotation.AfterChunk;
2527
import org.springframework.batch.core.annotation.AfterProcess;
2628
import org.springframework.batch.core.annotation.AfterRead;
@@ -108,6 +110,8 @@ public class ChunkOrientedStepBuilder<I, O> extends StepBuilderHelper<ChunkOrien
108110

109111
private AsyncTaskExecutor asyncTaskExecutor;
110112

113+
private MeterRegistry meterRegistry;
114+
111115
ChunkOrientedStepBuilder(StepBuilderHelper<?> parent, int chunkSize) {
112116
super(parent);
113117
this.chunkSize = chunkSize;
@@ -359,6 +363,18 @@ public ChunkOrientedStepBuilder<I, O> taskExecutor(AsyncTaskExecutor asyncTaskEx
359363
return self();
360364
}
361365

366+
/**
367+
* Set the meter registry to be used for collecting metrics during step execution.
368+
* This allows for monitoring and analyzing the performance of the step. If not set,
369+
* it will default to {@link io.micrometer.core.instrument.Metrics#globalRegistry}.
370+
* @param meterRegistry the MeterRegistry to use
371+
* @return this for fluent chaining
372+
*/
373+
public ChunkOrientedStepBuilder<I, O> meterRegistry(MeterRegistry meterRegistry) {
374+
this.meterRegistry = meterRegistry;
375+
return self();
376+
}
377+
362378
@SuppressWarnings("unchecked")
363379
public ChunkOrientedStep<I, O> build() {
364380
ChunkOrientedStep<I, O> chunkOrientedStep = new ChunkOrientedStep<>(this.getName(), this.chunkSize, this.reader,
@@ -412,6 +428,9 @@ public ChunkOrientedStep<I, O> build() {
412428
});
413429
retryListeners.forEach(chunkOrientedStep::registerRetryListener);
414430
skipListeners.forEach(chunkOrientedStep::registerSkipListener);
431+
if (this.meterRegistry != null) {
432+
chunkOrientedStep.setMeterRegistry(this.meterRegistry);
433+
}
415434
try {
416435
chunkOrientedStep.afterPropertiesSet();
417436
}

spring-batch-core/src/main/java/org/springframework/batch/core/step/item/ChunkOrientedStep.java

Lines changed: 56 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@
1919
import java.util.List;
2020
import java.util.concurrent.Future;
2121

22+
import io.micrometer.core.instrument.MeterRegistry;
23+
import io.micrometer.core.instrument.Metrics;
24+
import io.micrometer.core.instrument.Tag;
25+
import io.micrometer.core.instrument.Timer;
2226
import org.apache.commons.logging.Log;
2327
import org.apache.commons.logging.LogFactory;
2428
import org.jspecify.annotations.Nullable;
@@ -35,7 +39,11 @@
3539
import org.springframework.batch.core.listener.ItemWriteListener;
3640
import org.springframework.batch.core.listener.SkipListener;
3741
import org.springframework.batch.core.observability.BatchMetrics;
38-
import org.springframework.batch.core.observability.jfr.events.step.chunk.*;
42+
import org.springframework.batch.core.observability.jfr.events.step.chunk.ChunkScanEvent;
43+
import org.springframework.batch.core.observability.jfr.events.step.chunk.ChunkTransactionEvent;
44+
import org.springframework.batch.core.observability.jfr.events.step.chunk.ChunkWriteEvent;
45+
import org.springframework.batch.core.observability.jfr.events.step.chunk.ItemProcessEvent;
46+
import org.springframework.batch.core.observability.jfr.events.step.chunk.ItemReadEvent;
3947
import org.springframework.batch.core.scope.context.StepContext;
4048
import org.springframework.batch.core.scope.context.StepSynchronizationManager;
4149
import org.springframework.batch.core.step.StepContribution;
@@ -67,7 +75,6 @@
6775
import org.springframework.transaction.TransactionStatus;
6876
import org.springframework.transaction.interceptor.DefaultTransactionAttribute;
6977
import org.springframework.transaction.interceptor.TransactionAttribute;
70-
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
7178
import org.springframework.transaction.support.TransactionTemplate;
7279
import org.springframework.util.Assert;
7380

@@ -146,6 +153,11 @@ public class ChunkOrientedStep<I, O> extends AbstractStep {
146153
*/
147154
private AsyncTaskExecutor taskExecutor;
148155

156+
/*
157+
* Observability parameters
158+
*/
159+
private MeterRegistry meterRegistry;
160+
149161
/**
150162
* Create a new {@link ChunkOrientedStep}.
151163
* @param name the name of the step
@@ -306,6 +318,16 @@ public void registerSkipListener(SkipListener<I, O> skipListener) {
306318
this.compositeSkipListener.register(skipListener);
307319
}
308320

321+
/**
322+
* Set the meter registry to use for metrics.
323+
* @param meterRegistry the meter registry
324+
* @since 6.0
325+
*/
326+
public void setMeterRegistry(MeterRegistry meterRegistry) {
327+
Assert.notNull(meterRegistry, "Meter registry must not be null");
328+
this.meterRegistry = meterRegistry;
329+
}
330+
309331
@Override
310332
public void afterPropertiesSet() throws Exception {
311333
super.afterPropertiesSet();
@@ -335,6 +357,10 @@ public void afterPropertiesSet() throws Exception {
335357
this.retryTemplate.setRetryPolicy(this.retryPolicy);
336358
this.retryTemplate.setRetryListener(this.compositeRetryListener);
337359
}
360+
if (this.meterRegistry == null) {
361+
logger.info("No meter registry has been set. Defaulting to the global meter registry.");
362+
this.meterRegistry = Metrics.globalRegistry;
363+
}
338364
}
339365

340366
@Override
@@ -481,6 +507,8 @@ private Chunk<I> readChunk(StepContribution contribution) throws Exception {
481507
@Nullable private I readItem(StepContribution contribution) throws Exception {
482508
ItemReadEvent itemReadEvent = new ItemReadEvent(contribution.getStepExecution().getStepName(),
483509
contribution.getStepExecution().getId());
510+
Timer.Sample sample = startTimerSample();
511+
String status = BatchMetrics.STATUS_SUCCESS;
484512
I item = null;
485513
try {
486514
itemReadEvent.begin();
@@ -504,9 +532,12 @@ private Chunk<I> readChunk(StepContribution contribution) throws Exception {
504532
throw exception;
505533
}
506534
itemReadEvent.itemReadStatus = BatchMetrics.STATUS_FAILURE;
535+
status = BatchMetrics.STATUS_FAILURE;
507536
}
508537
finally {
509538
itemReadEvent.commit();
539+
stopTimerSample(sample, contribution.getStepExecution().getJobExecution().getJobInstance().getJobName(),
540+
contribution.getStepExecution().getStepName(), "item.read", "Item reading", status);
510541
}
511542
return item;
512543
}
@@ -558,6 +589,8 @@ private Chunk<O> processChunk(Chunk<I> chunk, StepContribution contribution) thr
558589
private O processItem(I item, StepContribution contribution) throws Exception {
559590
ItemProcessEvent itemProcessEvent = new ItemProcessEvent(contribution.getStepExecution().getStepName(),
560591
contribution.getStepExecution().getId());
592+
Timer.Sample sample = startTimerSample();
593+
String status = BatchMetrics.STATUS_SUCCESS;
561594
O processedItem = null;
562595
try {
563596
itemProcessEvent.begin();
@@ -578,9 +611,12 @@ private O processItem(I item, StepContribution contribution) throws Exception {
578611
throw exception;
579612
}
580613
itemProcessEvent.itemProcessStatus = BatchMetrics.STATUS_FAILURE;
614+
status = BatchMetrics.STATUS_FAILURE;
581615
}
582616
finally {
583617
itemProcessEvent.commit();
618+
stopTimerSample(sample, contribution.getStepExecution().getJobExecution().getJobInstance().getJobName(),
619+
contribution.getStepExecution().getStepName(), "item.process", "Item processing", status);
584620
}
585621
return processedItem;
586622
}
@@ -633,6 +669,8 @@ private void doSkipInProcess(I item, RetryException retryException, StepContribu
633669
private void writeChunk(Chunk<O> chunk, StepContribution contribution) throws Exception {
634670
ChunkWriteEvent chunkWriteEvent = new ChunkWriteEvent(contribution.getStepExecution().getStepName(),
635671
contribution.getStepExecution().getId(), chunk.size());
672+
Timer.Sample sample = startTimerSample();
673+
String status = BatchMetrics.STATUS_SUCCESS;
636674
try {
637675
chunkWriteEvent.begin();
638676
this.compositeItemWriteListener.beforeWrite(chunk);
@@ -644,6 +682,7 @@ private void writeChunk(Chunk<O> chunk, StepContribution contribution) throws Ex
644682
catch (Exception exception) {
645683
this.compositeItemWriteListener.onWriteError(exception, chunk);
646684
chunkWriteEvent.chunkWriteStatus = BatchMetrics.STATUS_FAILURE;
685+
status = BatchMetrics.STATUS_FAILURE;
647686
if (this.faultTolerant && exception instanceof RetryException retryException) {
648687
logger.info("Retry exhausted while attempting to write items, scanning the chunk", retryException);
649688
ChunkScanEvent chunkScanEvent = new ChunkScanEvent(contribution.getStepExecution().getStepName(),
@@ -660,6 +699,8 @@ private void writeChunk(Chunk<O> chunk, StepContribution contribution) throws Ex
660699
}
661700
finally {
662701
chunkWriteEvent.commit();
702+
stopTimerSample(sample, contribution.getStepExecution().getJobExecution().getJobInstance().getJobName(),
703+
contribution.getStepExecution().getStepName(), "chunk.write", "Chunk writing", status);
663704
}
664705
}
665706

@@ -711,6 +752,19 @@ private void scan(Chunk<O> chunk, StepContribution contribution) {
711752
}
712753
}
713754

755+
private Timer.Sample startTimerSample() {
756+
return BatchMetrics.createTimerSample(this.meterRegistry);
757+
}
758+
759+
private void stopTimerSample(Timer.Sample sample, String jobName, String stepName, String operation,
760+
String description, String status) {
761+
String fullyQualifiedMetricName = BatchMetrics.METRICS_PREFIX + operation;
762+
sample.stop(BatchMetrics.createTimer(this.meterRegistry, operation, description + " duration",
763+
Tag.of(fullyQualifiedMetricName + ".job.name", jobName),
764+
Tag.of(fullyQualifiedMetricName + ".step.name", stepName),
765+
Tag.of(fullyQualifiedMetricName + ".status", status)));
766+
}
767+
714768
private boolean isConcurrent() {
715769
return this.taskExecutor != null;
716770
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
/*
2+
* Copyright 2025-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.batch.core.step.item;
17+
18+
import java.util.List;
19+
20+
import io.micrometer.core.instrument.MeterRegistry;
21+
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
22+
import org.junit.jupiter.api.Assertions;
23+
import org.junit.jupiter.api.Test;
24+
25+
import org.springframework.batch.core.ExitStatus;
26+
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
27+
import org.springframework.batch.core.job.Job;
28+
import org.springframework.batch.core.job.JobExecution;
29+
import org.springframework.batch.core.job.builder.JobBuilder;
30+
import org.springframework.batch.core.job.parameters.JobParameters;
31+
import org.springframework.batch.core.launch.JobOperator;
32+
import org.springframework.batch.core.repository.JobRepository;
33+
import org.springframework.batch.core.step.Step;
34+
import org.springframework.batch.core.step.builder.ChunkOrientedStepBuilder;
35+
import org.springframework.batch.item.support.ListItemReader;
36+
import org.springframework.context.ApplicationContext;
37+
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
38+
import org.springframework.context.annotation.Bean;
39+
import org.springframework.context.annotation.Configuration;
40+
41+
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
42+
43+
/**
44+
* Integration tests for observability features in {@link ChunkOrientedStep}.
45+
*
46+
* @author Mahmoud Ben Hassine
47+
*/
48+
public class ChunkOrientedStepObservabilityIntegrationTests {
49+
50+
@Test
51+
void testChunkOrientedStepMetrics() throws Exception {
52+
// given
53+
ApplicationContext context = new AnnotationConfigApplicationContext(TestConfiguration.class);
54+
SimpleMeterRegistry meterRegistry = context.getBean(SimpleMeterRegistry.class);
55+
JobOperator jobOperator = context.getBean(JobOperator.class);
56+
Job job = context.getBean(Job.class);
57+
58+
// when
59+
JobExecution jobExecution = jobOperator.start(job, new JobParameters());
60+
61+
// then
62+
Assertions.assertEquals(ExitStatus.COMPLETED, jobExecution.getExitStatus());
63+
Assertions.assertEquals(3, meterRegistry.getMeters().size());
64+
assertDoesNotThrow(
65+
() -> meterRegistry.get("spring.batch.item.read")
66+
.tag("spring.batch.item.read.job.name", "job")
67+
.tag("spring.batch.item.read.step.name", "step")
68+
.tag("spring.batch.item.read.status", "SUCCESS")
69+
.timer(),
70+
"There should be a meter of type TIMER named spring.batch.item.read registered in the meter registry");
71+
assertDoesNotThrow(
72+
() -> meterRegistry.get("spring.batch.item.process")
73+
.tag("spring.batch.item.process.job.name", "job")
74+
.tag("spring.batch.item.process.step.name", "step")
75+
.tag("spring.batch.item.process.status", "SUCCESS")
76+
.timer(),
77+
"There should be a meter of type TIMER named spring.batch.item.process registered in the meter registry");
78+
assertDoesNotThrow(
79+
() -> meterRegistry.get("spring.batch.chunk.write")
80+
.tag("spring.batch.chunk.write.job.name", "job")
81+
.tag("spring.batch.chunk.write.step.name", "step")
82+
.tag("spring.batch.chunk.write.status", "SUCCESS")
83+
.timer(),
84+
"There should be a meter of type TIMER named spring.batch.chunk.write registered in the meter registry");
85+
}
86+
87+
@Configuration
88+
@EnableBatchProcessing
89+
static class TestConfiguration {
90+
91+
@Bean
92+
public Job job(JobRepository jobRepository, Step step) {
93+
return new JobBuilder(jobRepository).start(step).build();
94+
}
95+
96+
@Bean
97+
public Step step(JobRepository jobRepository, MeterRegistry meterRegistry) {
98+
return new ChunkOrientedStepBuilder<String, String>(jobRepository, 2)
99+
.reader(new ListItemReader<>(List.of("one", "two", "three", "four", "five")))
100+
.processor(String::toUpperCase)
101+
.writer(items -> {
102+
})
103+
.meterRegistry(meterRegistry)
104+
.build();
105+
}
106+
107+
@Bean
108+
public SimpleMeterRegistry meterRegistry() {
109+
return new SimpleMeterRegistry();
110+
}
111+
112+
}
113+
114+
}

0 commit comments

Comments
 (0)