1919import static org .junit .Assert .assertEquals ;
2020
2121/**
22+ * The Batch specification provides a Chunk Oriented processing style. This style is defined by enclosing into a
23+ * transaction a set of reads, process and write operations via +javax.batch.api.chunk.ItemReader+,
24+ * +javax.batch.api.chunk.ItemProcessor+ and +javax.batch.api.chunk.ItemWriter+. Items are read one at a time, processed
25+ * and aggregated. The transaction is then committed when the defined +checkpoint-policy+ is triggered.
26+ *
27+ * Many batch processing problems can be solved with single threaded, single process jobs, but the Batch specification
28+ * allows for steps to be executed as a partitioned step, meaning that the step can be parallelized across multiple
29+ * threads. This is useful if you have some kind of bottleneck or if you can considerable boost your batch processing
30+ * performance by splitting the work to be done.
31+ *
32+ * You can define the number of partitions and the number of threads using a custom mapper. The custom mapper needs to
33+ * implement +javax.batch.api.partition.PartitionMapper+ and create a new +javax.batch.api.partition.PartitionPlan+ to
34+ * define the partitions behaviour. Each partition is required to receive a set of unique parameters that instruct it
35+ * into which data it should operate.
36+ *
37+ * Since each thread runs a separate copy of the step, chunking and checkpointing occur independently on each thread for
38+ * chunk type steps.
39+ *
40+ * include::myJob.xml[]
41+ *
42+ * A job is defined in the +myJob.xml+ file. Just a single step with a reader, a processor and a writer. This step also
43+ * defines that the step should be executed into a partition with a custom mapper:
44+ *
45+ * include::MyMapper[]
46+ *
47+ * The mapper defines 2 partitions and 2 threads. Properties for each partition define the data that is going to be
48+ * read. For the first partition we start on 1 and end on 10. For the second partition we start on 11 and end on 20. The
49+ * +MyItemReader+ will generate the data based on these properties.
50+ *
51+ * include::MyItemReader[]
52+ *
2253 * @author Roberto Cortez
2354 */
2455@ RunWith (Arquillian .class )
2556public class BatchChunkMapperTest {
57+ /**
58+ * We're just going to deploy the application as a +web archive+. Note the inclusion of the following files:
59+ *
60+ * [source,file]
61+ * ----
62+ * /META-INF/batch-jobs/myjob.xml
63+ * ----
64+ *
65+ * The +myjob.xml+ file is needed for running the batch definition.
66+ */
2667 @ Deployment
2768 public static WebArchive createDeployment () {
2869 WebArchive war = ShrinkWrap .create (WebArchive .class )
@@ -34,6 +75,16 @@ public static WebArchive createDeployment() {
3475 return war ;
3576 }
3677
78+ /**
79+ * In the test, we're just going to invoke the batch execution and wait for completion. To validate the test
80+ * expected behaviour we need to query the +Metric[]+ object available in the step execution.
81+ *
82+ * The batch process itself will read and process 20 elements from numbers 1 to 20, but only write the odd
83+ * elements. Elements from 1 to 10 will be processed in one partition and elements from 11 to 20 in another
84+ * partition. Commits are executed after 3 elements are read by partition.
85+ *
86+ * @throws Exception an exception if the batch could not complete successfully.
87+ */
3788 @ Test
3889 public void testBatchChunkMapper () throws Exception {
3990 JobOperator jobOperator = BatchRuntime .getJobOperator ();
@@ -47,16 +98,21 @@ public void testBatchChunkMapper() throws Exception {
4798 if (stepExecution .getStepName ().equals ("myStep" )) {
4899 Map <Metric .MetricType , Long > metricsMap = BatchTestHelper .getMetricsMap (stepExecution .getMetrics ());
49100
101+ // <1> The read count should be 20 elements. Check +MyItemReader+.
50102 assertEquals (20L , metricsMap .get (Metric .MetricType .READ_COUNT ).longValue ());
103+ // <2> The write count should be 10. Only half of the elements read are processed to be written.
51104 assertEquals (10L , metricsMap .get (Metric .MetricType .WRITE_COUNT ).longValue ());
52105 // Number of elements by the item count value on myJob.xml, plus an additional transaction for the
53106 // remaining elements by each partition.
54- long commitCount = 20L / 3 + (20 % 3 > 0 ? 1 : 0 ) * 2 ;
107+ long commitCount = (10L / 3 + (10 % 3 > 0 ? 1 : 0 )) * 2 ;
108+ // <3> The commit count should be 8. Checkpoint is on every 3rd read, 4 commits for read elements and 2 partitions.
55109 assertEquals (commitCount , metricsMap .get (Metric .MetricType .COMMIT_COUNT ).longValue ());
56110 }
57111 }
58112
113+ // <4> Make sure that all the partitions were created.
59114 assertEquals (2L , MyItemReader .totalReaders );
115+ // <5> Job should be completed.
60116 assertEquals (BatchStatus .COMPLETED , jobExecution .getBatchStatus ());
61117 }
62118}
0 commit comments