Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ protected JobKeyGenerator getJobKeyGenerator() {
* @since 6.0
*/
protected DataFieldMaxValueIncrementer getJobInstanceIncrementer() {
return new MongoSequenceIncrementer(getMongoOperations(), "BATCH_JOB_INSTANCE_SEQ");
return new MongoSequenceIncrementer();
}

/**
Expand All @@ -168,7 +168,7 @@ protected DataFieldMaxValueIncrementer getJobInstanceIncrementer() {
* @since 6.0
*/
protected DataFieldMaxValueIncrementer getJobExecutionIncrementer() {
return new MongoSequenceIncrementer(getMongoOperations(), "BATCH_JOB_EXECUTION_SEQ");
return new MongoSequenceIncrementer();
}

/**
Expand All @@ -177,7 +177,7 @@ protected DataFieldMaxValueIncrementer getJobExecutionIncrementer() {
* @since 6.0
*/
protected DataFieldMaxValueIncrementer getStepExecutionIncrementer() {
return new MongoSequenceIncrementer(getMongoOperations(), "BATCH_STEP_EXECUTION_SEQ");
return new MongoSequenceIncrementer();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ public class MongoJobExecutionDao implements JobExecutionDao {

private static final String JOB_EXECUTIONS_COLLECTION_NAME = "BATCH_JOB_EXECUTION";

private static final String JOB_EXECUTIONS_SEQUENCE_NAME = "BATCH_JOB_EXECUTION_SEQ";

private final MongoOperations mongoOperations;

private final JobExecutionConverter jobExecutionConverter = new JobExecutionConverter();
Expand All @@ -62,7 +60,7 @@ public class MongoJobExecutionDao implements JobExecutionDao {

public MongoJobExecutionDao(MongoOperations mongoOperations) {
this.mongoOperations = mongoOperations;
this.jobExecutionIncrementer = new MongoSequenceIncrementer(mongoOperations, JOB_EXECUTIONS_SEQUENCE_NAME);
this.jobExecutionIncrementer = new MongoSequenceIncrementer();
}

public void setJobExecutionIncrementer(DataFieldMaxValueIncrementer jobExecutionIncrementer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ public class MongoJobInstanceDao implements JobInstanceDao {

private static final String COLLECTION_NAME = "BATCH_JOB_INSTANCE";

private static final String SEQUENCE_NAME = "BATCH_JOB_INSTANCE_SEQ";

private final MongoOperations mongoOperations;

private DataFieldMaxValueIncrementer jobInstanceIncrementer;
Expand All @@ -57,7 +55,7 @@ public class MongoJobInstanceDao implements JobInstanceDao {
public MongoJobInstanceDao(MongoOperations mongoOperations) {
Assert.notNull(mongoOperations, "mongoOperations must not be null.");
this.mongoOperations = mongoOperations;
this.jobInstanceIncrementer = new MongoSequenceIncrementer(mongoOperations, SEQUENCE_NAME);
this.jobInstanceIncrementer = new MongoSequenceIncrementer();
}

public void setJobKeyGenerator(JobKeyGenerator jobKeyGenerator) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,12 @@
*/
package org.springframework.batch.core.repository.dao.mongodb;

import com.mongodb.client.model.FindOneAndUpdateOptions;
import com.mongodb.client.model.ReturnDocument;
import org.bson.Document;

import org.springframework.dao.DataAccessException;
import org.springframework.data.mongodb.core.MongoOperations;
import org.springframework.jdbc.support.incrementer.DataFieldMaxValueIncrementer;

// Based on https://www.mongodb.com/blog/post/generating-globally-unique-identifiers-for-use-with-mongodb
// Section: Use a single counter document to generate unique identifiers one at a time
import java.net.InetAddress;
import java.security.SecureRandom;
import java.util.concurrent.atomic.AtomicInteger;

/**
* @author Mahmoud Ben Hassine
Expand All @@ -33,22 +29,35 @@
*/
public class MongoSequenceIncrementer implements DataFieldMaxValueIncrementer {

private final MongoOperations mongoTemplate;
private static final int NODE_BITS = 10;
private static final int SEQUENCE_BITS = 12;
private static final int NODE_SHIFT = SEQUENCE_BITS;
private static final int TIMESTAMP_SHIFT = NODE_BITS + SEQUENCE_BITS;
private static final int SEQUENCE_MASK = (1 << SEQUENCE_BITS) - 1;
private static final int NODE_MASK = (1 << NODE_BITS) - 1;

private static final long TSID_EPOCH = 1577836800000L;

private final int nodeId;
private final AtomicInteger sequence = new AtomicInteger(0);
private volatile long lastTimestamp = -1L;

private static final SecureRandom random = new SecureRandom();

private final String sequenceName;
public MongoSequenceIncrementer() {
this.nodeId = calculateNodeId();
}

public MongoSequenceIncrementer(MongoOperations mongoTemplate, String sequenceName) {
this.mongoTemplate = mongoTemplate;
this.sequenceName = sequenceName;
public MongoSequenceIncrementer(int nodeId) {
if (nodeId < 0 || nodeId > NODE_MASK) {
throw new IllegalArgumentException("Node ID must be between 0 and " + NODE_MASK);
}
this.nodeId = nodeId;
}

@Override
public long nextLongValue() throws DataAccessException {
return mongoTemplate.execute("BATCH_SEQUENCES",
collection -> collection
.findOneAndUpdate(new Document("_id", sequenceName), new Document("$inc", new Document("count", 1)),
new FindOneAndUpdateOptions().returnDocument(ReturnDocument.AFTER))
.getLong("count"));
return generateTsid();
}

@Override
Expand All @@ -61,4 +70,45 @@ public String nextStringValue() throws DataAccessException {
throw new UnsupportedOperationException();
}

private synchronized long generateTsid() {
long timestamp = System.currentTimeMillis() - TSID_EPOCH;

if (timestamp < lastTimestamp) {
timestamp = lastTimestamp;
}

if (timestamp == lastTimestamp) {
int seq = sequence.incrementAndGet() & SEQUENCE_MASK;
if (seq == 0) {
timestamp = waitNextMillis(lastTimestamp);
lastTimestamp = timestamp;
}
return (timestamp << TIMESTAMP_SHIFT) | ((long) nodeId << NODE_SHIFT) | seq;
} else {
sequence.set(0);
lastTimestamp = timestamp;
return (timestamp << TIMESTAMP_SHIFT) | ((long) nodeId << NODE_SHIFT);
}
}

private long waitNextMillis(long lastTimestamp) {
long timestamp = System.currentTimeMillis() - TSID_EPOCH;
while (timestamp <= lastTimestamp) {
timestamp = System.currentTimeMillis() - TSID_EPOCH;
}
return timestamp;
}

private int calculateNodeId() {
try {
String hostname = InetAddress.getLocalHost().getHostName();
int hostHash = hostname.hashCode();
long processId = ProcessHandle.current().pid();
long randomValue = random.nextInt();
return (int) ((hostHash ^ processId ^ randomValue) & NODE_MASK);
} catch (Exception e) {
return (int) ((System.nanoTime() ^ Thread.currentThread().getId()) & NODE_MASK);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ public class MongoStepExecutionDao implements StepExecutionDao {

private static final String STEP_EXECUTIONS_COLLECTION_NAME = "BATCH_STEP_EXECUTION";

private static final String STEP_EXECUTIONS_SEQUENCE_NAME = "BATCH_STEP_EXECUTION_SEQ";

private static final String JOB_EXECUTIONS_COLLECTION_NAME = "BATCH_JOB_EXECUTION";

private final StepExecutionConverter stepExecutionConverter = new StepExecutionConverter();
Expand All @@ -59,7 +57,7 @@ public class MongoStepExecutionDao implements StepExecutionDao {

public MongoStepExecutionDao(MongoOperations mongoOperations) {
this.mongoOperations = mongoOperations;
this.stepExecutionIncrementer = new MongoSequenceIncrementer(mongoOperations, STEP_EXECUTIONS_SEQUENCE_NAME);
this.stepExecutionIncrementer = new MongoSequenceIncrementer();
}

public void setStepExecutionIncrementer(DataFieldMaxValueIncrementer stepExecutionIncrementer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,15 +108,13 @@ public void afterPropertiesSet() throws Exception {
super.afterPropertiesSet();
Assert.notNull(this.mongoOperations, "MongoOperations must not be null.");
if (this.jobInstanceIncrementer == null) {
this.jobInstanceIncrementer = new MongoSequenceIncrementer(this.mongoOperations, "BATCH_JOB_INSTANCE_SEQ");
this.jobInstanceIncrementer = new MongoSequenceIncrementer();
}
if (this.jobExecutionIncrementer == null) {
this.jobExecutionIncrementer = new MongoSequenceIncrementer(this.mongoOperations,
"BATCH_JOB_EXECUTION_SEQ");
this.jobExecutionIncrementer = new MongoSequenceIncrementer();
}
if (this.stepExecutionIncrementer == null) {
this.stepExecutionIncrementer = new MongoSequenceIncrementer(this.mongoOperations,
"BATCH_STEP_EXECUTION_SEQ");
this.stepExecutionIncrementer = new MongoSequenceIncrementer();
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright 2024-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.core.repository.dao.mongodb;

import org.junit.jupiter.api.Test;
import org.springframework.dao.DataAccessException;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import static org.junit.jupiter.api.Assertions.*;

/**
* Tests for {@link MongoSequenceIncrementer}.
*/
public class MongoSequenceIncrementerTests {

@Test
void testTimeOrdering() throws DataAccessException {
MongoSequenceIncrementer incrementer = new MongoSequenceIncrementer();
List<Long> ids = new ArrayList<>();

for (int i = 0; i < 10; i++) {
ids.add(incrementer.nextLongValue());
}

List<Long> sorted = new ArrayList<>(ids);
Collections.sort(sorted);
assertEquals(sorted, ids, "IDs should be in time order");
}

@Test
void testConcurrency() throws InterruptedException {
MongoSequenceIncrementer incrementer = new MongoSequenceIncrementer();
Set<Long> ids = Collections.synchronizedSet(new HashSet<>());
int threadCount = 10;
int idsPerThread = 100;
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
CountDownLatch latch = new CountDownLatch(threadCount);

for (int i = 0; i < threadCount; i++) {
executor.submit(() -> {
try {
for (int j = 0; j < idsPerThread; j++) {
ids.add(incrementer.nextLongValue());
}
}
catch (DataAccessException e) {
fail("Should not throw DataAccessException: " + e.getMessage());
}
finally {
latch.countDown();
}
});
}

latch.await(10, TimeUnit.SECONDS);
executor.shutdown();

assertEquals(threadCount * idsPerThread, ids.size(),
"All IDs generated from multiple threads should be unique");
}

@Test
void testNodeIdSeparation() throws DataAccessException {
MongoSequenceIncrementer incrementer1 = new MongoSequenceIncrementer(1);
MongoSequenceIncrementer incrementer2 = new MongoSequenceIncrementer(2);

long id1 = incrementer1.nextLongValue();
long id2 = incrementer2.nextLongValue();

assertNotEquals(id1, id2, "IDs from different nodes should be different");

long nodeId1 = (id1 >> 12) & 0x3FF;
long nodeId2 = (id2 >> 12) & 0x3FF;

assertEquals(1, nodeId1, "First ID should have node ID 1");
assertEquals(2, nodeId2, "Second ID should have node ID 2");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,51 @@ void testJobExecution(@Autowired JobOperator jobOperator, @Autowired Job job) th
dump(stepExecutionsCollection, "step execution = ");
}

@Test
void testParallelJobExecution(@Autowired JobOperator jobOperator, @Autowired Job job) throws Exception {
int parallelJobs = 10;
Thread[] threads = new Thread[parallelJobs];
JobExecution[] executions = new JobExecution[parallelJobs];

for (int i = 0; i < parallelJobs; i++) {
final int idx = i;
threads[i] = new Thread(() -> {
JobParameters jobParameters = new JobParametersBuilder()
.addString("name", "foo" + idx)
.addLocalDateTime("runtime", LocalDateTime.now())
.toJobParameters();
try {
executions[idx] = jobOperator.start(job, jobParameters);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
threads[i].start();
}

for (Thread t : threads) {
t.join();
}

for (JobExecution exec : executions) {
Assertions.assertNotNull(exec);
Assertions.assertEquals(ExitStatus.COMPLETED, exec.getExitStatus());
}

MongoCollection<Document> jobInstancesCollection = mongoTemplate.getCollection("BATCH_JOB_INSTANCE");
MongoCollection<Document> jobExecutionsCollection = mongoTemplate.getCollection("BATCH_JOB_EXECUTION");
MongoCollection<Document> stepExecutionsCollection = mongoTemplate.getCollection("BATCH_STEP_EXECUTION");

Assertions.assertEquals(parallelJobs, jobInstancesCollection.countDocuments());
Assertions.assertEquals(parallelJobs, jobExecutionsCollection.countDocuments());
Assertions.assertEquals(parallelJobs * 2, stepExecutionsCollection.countDocuments());

// dump results for inspection
dump(jobInstancesCollection, "job instance = ");
dump(jobExecutionsCollection, "job execution = ");
dump(stepExecutionsCollection, "step execution = ");
}

private static void dump(MongoCollection<Document> collection, String prefix) {
for (Document document : collection.find()) {
System.out.println(prefix + document.toJson());
Expand Down