1919import static org .junit .Assert .assertEquals ;
2020
2121/**
22+ * The Batch specification provides a Chunk Oriented processing style. This style is defined by enclosing into a
23+ * transaction a set of reads, process and write operations via +javax.batch.api.chunk.ItemReader+,
24+ * +javax.batch.api.chunk.ItemProcessor+ and +javax.batch.api.chunk.ItemWriter+. Items are read one at a time, processed
25+ * and aggregated. The transaction is then committed when the defined +checkpoint-policy+ is triggered.
26+ *
27+ * Many batch processing problems can be solved with single threaded, single process jobs, but the Batch specification
28+ * allows for steps to be executed as a partitioned step, meaning that the step can be parallelized across multiple
29+ * threads. This is useful if you have some kind of bottleneck or if you can considerable boost your batch processing
30+ * performance by splitting the work to be done.
31+ *
32+ * You can define the number of partitions and the number of threads by using the element +partition+ in the +step+
33+ * definition. Each partition is required to receive a set of unique parameters that instruct it into which data it
34+ * should operate.
35+ *
36+ * Since each thread runs a separate copy of the step, chunking and checkpointing occur independently on each thread for
37+ * chunk type steps.
38+ *
39+ * include::myJob.xml[]
40+ *
41+ * A job is defined in the +myJob.xml+ file. Just a single step with a reader, a processor and a writer. This step also
42+ * defines that the step should be executed in two separate partitions: "1" and "2". Properties for each partition
43+ * define the data that is going to be read. For partition "1" we start on 1 and end on 10. For partition "2" we start
44+ * on 11 and end on 20. The +MyItemReader+ will generate the data based on these properties.
45+ *
46+ * include::MyItemReader[]
47+ *
2248 * @author Roberto Cortez
2349 */
2450@ RunWith (Arquillian .class )
2551public class BatchChunkPartitionTest {
52+ /**
53+ * We're just going to deploy the application as a +web archive+. Note the inclusion of the following files:
54+ *
55+ * [source,file]
56+ * ----
57+ * /META-INF/batch-jobs/myjob.xml
58+ * ----
59+ *
60+ * The +myjob.xml+ file is needed for running the batch definition.
61+ */
2662 @ Deployment
2763 public static WebArchive createDeployment () {
2864 WebArchive war = ShrinkWrap .create (WebArchive .class )
@@ -34,6 +70,16 @@ public static WebArchive createDeployment() {
3470 return war ;
3571 }
3672
73+ /**
74+ * In the test, we're just going to invoke the batch execution and wait for completion. To validate the test
75+ * expected behaviour we need to query the +Metric[]+ object available in the step execution.
76+ *
77+ * The batch process itself will read and process 20 elements from numbers 1 to 20, but only write the odd
78+ * elements. Elements from 1 to 10 will be processed in one partition and elements from 11 to 20 in another
79+ * partition. Commits are executed after 3 elements are read by partition.
80+ *
81+ * @throws Exception an exception if the batch could not complete successfully.
82+ */
3783 @ Test
3884 public void testBatchChunkPartition () throws Exception {
3985 JobOperator jobOperator = BatchRuntime .getJobOperator ();
@@ -47,15 +93,19 @@ public void testBatchChunkPartition() throws Exception {
4793 if (stepExecution .getStepName ().equals ("myStep" )) {
4894 Map <Metric .MetricType , Long > metricsMap = BatchTestHelper .getMetricsMap (stepExecution .getMetrics ());
4995
96+ // <1> The read count should be 20 elements. Check +MyItemReader+.
5097 assertEquals (20L , metricsMap .get (Metric .MetricType .READ_COUNT ).longValue ());
98+ // <2> The write count should be 10. Only half of the elements read are processed to be written.
5199 assertEquals (10L , metricsMap .get (Metric .MetricType .WRITE_COUNT ).longValue ());
52100 // Number of elements by the item count value on myJob.xml, plus an additional transaction for the
53101 // remaining elements by each partition.
54- long commitCount = 20L / 3 + (20 % 3 > 0 ? 1 : 0 ) * 2 ;
102+ long commitCount = (10L / 3 + (10 % 3 > 0 ? 1 : 0 )) * 2 ;
103+ // <3> The commit count should be 8. Checkpoint is on every 3rd read, 4 commits for read elements and 2 partitions.
55104 assertEquals (commitCount , metricsMap .get (Metric .MetricType .COMMIT_COUNT ).longValue ());
56105 }
57106 }
58107
108+ // <4> Job should be completed.
59109 assertEquals (BatchStatus .COMPLETED , jobExecution .getBatchStatus ());
60110 }
61111}
0 commit comments