diff --git a/src/main/java/org/breedinginsight/api/v1/controller/ExperimentController.java b/src/main/java/org/breedinginsight/api/v1/controller/ExperimentController.java index 8619a140a..fe5436cd6 100644 --- a/src/main/java/org/breedinginsight/api/v1/controller/ExperimentController.java +++ b/src/main/java/org/breedinginsight/api/v1/controller/ExperimentController.java @@ -5,6 +5,7 @@ import io.micronaut.http.HttpStatus; import io.micronaut.http.MediaType; import io.micronaut.http.annotation.*; +import io.micronaut.http.exceptions.HttpStatusException; import io.micronaut.http.server.types.files.StreamedFile; import io.micronaut.security.annotation.Secured; import io.micronaut.security.rules.SecurityRule; @@ -134,6 +135,9 @@ public HttpResponse> createSubEntityDataset( Response response = new Response(experimentService.createSubEntityDataset(programOptional.get(), experimentId, datasetRequest)); return HttpResponse.ok(response); + } catch (HttpStatusException e) { + log.info(e.getMessage()); + return HttpResponse.status(e.getStatus(), e.getMessage()); } catch (Exception e){ log.info(e.getMessage()); return HttpResponse.status(HttpStatus.UNPROCESSABLE_ENTITY, e.getMessage()); diff --git a/src/main/java/org/breedinginsight/brapi/v2/dao/BrAPIObservationLevelDAO.java b/src/main/java/org/breedinginsight/brapi/v2/dao/BrAPIObservationLevelDAO.java new file mode 100644 index 000000000..1af5aa33a --- /dev/null +++ b/src/main/java/org/breedinginsight/brapi/v2/dao/BrAPIObservationLevelDAO.java @@ -0,0 +1,90 @@ +/* + * See the NOTICE file distributed with this work for additional information + * regarding copyright ownership. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.breedinginsight.brapi.v2.dao; + +import com.google.gson.Gson; +import io.micronaut.http.HttpResponse; +import io.micronaut.http.HttpStatus; +import lombok.extern.slf4j.Slf4j; +import okhttp3.HttpUrl; +import okhttp3.MediaType; +import okhttp3.Request; +import okhttp3.RequestBody; +import org.brapi.client.v2.JSON; +import org.brapi.client.v2.model.exceptions.ApiException; +import org.brapi.v2.model.pheno.BrAPIObservationUnitHierarchyLevel; +import org.breedinginsight.model.DatasetLevel; +import org.breedinginsight.model.Program; +import org.breedinginsight.utilities.BrAPIDAOUtil; + +import javax.inject.Inject; +import javax.inject.Singleton; + +@Slf4j +@Singleton +public class BrAPIObservationLevelDAO { + + private static final MediaType JSON_MEDIA_TYPE = MediaType.get("application/json"); + private final BrAPIDAOUtil brAPIDAOUtil; + private final Gson gson = new JSON().getGson(); + + @Inject + public BrAPIObservationLevelDAO(BrAPIDAOUtil brAPIDAOUtil) { + this.brAPIDAOUtil = brAPIDAOUtil; + } + + public HttpResponse createObservationLevelName(Program program, String levelName, DatasetLevel levelOrder) throws ApiException { + HttpUrl url = HttpUrl.parse(brAPIDAOUtil.getProgramBrAPIBaseUrl(program.getId())) + .newBuilder() + .addPathSegment("observationlevelnames") + .build(); + BrAPIObservationUnitHierarchyLevel level = new BrAPIObservationUnitHierarchyLevel() + .levelName(levelName); + if (levelOrder != null) { + level.setLevelOrder(levelOrder.getValue()); + } + RequestBody body = RequestBody.create(gson.toJson(level), JSON_MEDIA_TYPE); + var request = new Request.Builder() + .url(url) + .post(body) + .addHeader("Content-Type", "application/json") + .build(); + return brAPIDAOUtil.makeCall(request); + } + + public void deleteObservationLevelName(Program program, String levelName) { + HttpUrl url = HttpUrl.parse(brAPIDAOUtil.getProgramBrAPIBaseUrl(program.getId())) + .newBuilder() + .addPathSegment("observationlevelnames") + .addPathSegment(levelName) + .build(); + var request = new Request.Builder() + .url(url) + .delete() + .addHeader("Content-Type", "application/json") + .build(); + try { + HttpResponse response = brAPIDAOUtil.makeCall(request); + if (response.getStatus() != HttpStatus.OK && response.getStatus() != HttpStatus.NO_CONTENT && response.getStatus() != HttpStatus.ACCEPTED) { + log.warn("Observation level delete returned status {} for {}", response.getStatus(), levelName); + } + } catch (Exception e) { + log.warn("Failed to delete observation level {} during rollback", levelName, e); + } + } +} diff --git a/src/main/java/org/breedinginsight/brapi/v2/dao/BrAPIObservationUnitDAO.java b/src/main/java/org/breedinginsight/brapi/v2/dao/BrAPIObservationUnitDAO.java index 9749bf093..5a86c2337 100644 --- a/src/main/java/org/breedinginsight/brapi/v2/dao/BrAPIObservationUnitDAO.java +++ b/src/main/java/org/breedinginsight/brapi/v2/dao/BrAPIObservationUnitDAO.java @@ -26,6 +26,8 @@ import io.micronaut.scheduling.annotation.Scheduled; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; +import okhttp3.HttpUrl; +import okhttp3.Request; import org.brapi.client.v2.JSON; import org.brapi.client.v2.model.exceptions.ApiException; import org.brapi.client.v2.modules.phenotype.ObservationUnitsApi; @@ -442,4 +444,32 @@ private void preprocessObservationUnits(List brapiObservat } } } + + public void deleteObservationUnits(Collection observationUnitDbIds, UUID programId) { + if (observationUnitDbIds == null || observationUnitDbIds.isEmpty()) { + return; + } + String baseUrl = brAPIDAOUtil.getProgramBrAPIBaseUrl(programId); + for (String ouDbId : observationUnitDbIds) { + if (StringUtils.isBlank(ouDbId)) { + continue; + } + HttpUrl url = HttpUrl.parse(baseUrl) + .newBuilder() + .addPathSegment("observationunits") + .addPathSegment(ouDbId) + .build(); + Request request = new Request.Builder() + .url(url) + .delete() + .addHeader("Content-Type", "application/json") + .build(); + try { + brAPIDAOUtil.makeCall(request); + } catch (Exception e) { + log.warn("Failed to delete observation unit {} during rollback", ouDbId, e); + } + } + repopulateCache(programId); + } } diff --git a/src/main/java/org/breedinginsight/brapi/v2/services/BrAPITrialService.java b/src/main/java/org/breedinginsight/brapi/v2/services/BrAPITrialService.java index 158ce952a..96337c026 100644 --- a/src/main/java/org/breedinginsight/brapi/v2/services/BrAPITrialService.java +++ b/src/main/java/org/breedinginsight/brapi/v2/services/BrAPITrialService.java @@ -6,7 +6,10 @@ import com.github.filosganga.geogson.model.positions.SinglePosition; import com.google.gson.JsonObject; import io.micronaut.context.annotation.Property; +import io.micronaut.http.HttpResponse; +import io.micronaut.http.HttpStatus; import io.micronaut.http.MediaType; +import io.micronaut.http.exceptions.HttpStatusException; import io.micronaut.http.server.exceptions.InternalServerException; import io.micronaut.http.server.types.files.StreamedFile; import lombok.extern.slf4j.Slf4j; @@ -41,12 +44,14 @@ import org.breedinginsight.utilities.FileUtil; import org.breedinginsight.utilities.Utilities; import org.jetbrains.annotations.NotNull; +import org.breedinginsight.services.lock.DistributedLockService; import javax.inject.Inject; import javax.inject.Singleton; import java.io.IOException; import java.io.PipedInputStream; import java.io.PipedOutputStream; +import java.time.Duration; import java.time.OffsetDateTime; import java.time.format.DateTimeFormatter; import java.util.*; @@ -54,6 +59,7 @@ import java.util.stream.Collectors; import java.util.zip.ZipEntry; import java.util.zip.ZipOutputStream; +import java.util.concurrent.TimeoutException; import static org.breedinginsight.brapps.importer.services.processors.experiment.model.ExpImportProcessConstants.OBSERVATION_UNIT_ID_SUFFIX; @@ -70,8 +76,10 @@ public class BrAPITrialService { private final BrAPIStudyDAO studyDAO; private final BrAPISeasonDAO seasonDAO; private final BrAPIObservationUnitDAO ouDAO; + private final BrAPIObservationLevelDAO observationLevelDAO; private final BrAPIGermplasmDAO germplasmDAO; private final FileMappingUtil fileMappingUtil; + private final DistributedLockService lockService; private static final String SHEET_NAME = "Data"; @Inject @@ -84,8 +92,10 @@ public BrAPITrialService(@Property(name = "brapi.server.reference-source") Strin BrAPIStudyDAO studyDAO, BrAPISeasonDAO seasonDAO, BrAPIObservationUnitDAO ouDAO, + BrAPIObservationLevelDAO observationLevelDAO, BrAPIGermplasmDAO germplasmDAO, - FileMappingUtil fileMappingUtil) { + FileMappingUtil fileMappingUtil, + DistributedLockService lockService) { this.referenceSource = referenceSource; this.trialDAO = trialDAO; @@ -96,8 +106,10 @@ public BrAPITrialService(@Property(name = "brapi.server.reference-source") Strin this.studyDAO = studyDAO; this.seasonDAO = seasonDAO; this.ouDAO = ouDAO; + this.observationLevelDAO = observationLevelDAO; this.germplasmDAO = germplasmDAO; this.fileMappingUtil = fileMappingUtil; + this.lockService = lockService; } public List getExperiments(UUID programId) throws ApiException, DoesNotExistException { @@ -189,7 +201,7 @@ public DownloadFile exportObservations( } //add obsUnitID as dynamic column with observation level appended to header - String observationLvl = ous.get(0).getAdditionalInfo().get(BrAPIAdditionalInfoFields.OBSERVATION_LEVEL).getAsString(); + String observationLvl = requireObservationLevelName(ous.get(0)); columns = dynamicUpdateObsUnitIDLabel(columns, observationLvl); if (params.getDatasetId() != null) { @@ -394,57 +406,116 @@ public List getDatasetsMetadata(Program program, UUID experimen } public Dataset createSubEntityDataset(Program program, UUID experimentId, SubEntityDatasetRequest request) throws ApiException, DoesNotExistException { - log.debug("creating sub-entity dataset: \"" + request.getName() + "\" for experiment: \"" + experimentId + "\" with: \"" + request.getRepeatedMeasures() + "\" repeated measures."); - UUID subEntityDatasetId = UUID.randomUUID(); - List subObsUnits = new ArrayList<>(); - BrAPITrial experiment = getExperiment(program, experimentId); - // Get top level dataset ObservationUnits. - DatasetMetadata topLevelDataset = DatasetUtil.getTopLevelDataset(experiment); - if (topLevelDataset == null) { - log.error("Experiment {} has no top level dataset.", experiment.getTrialDbId()); - throw new RuntimeException("Cannot create sub-entity dataset for experiment without top level dataset."); - } - - List expOUs = ouDAO.getObservationUnitsForDataset(topLevelDataset.getId().toString(), program); - for (BrAPIObservationUnit expUnit : expOUs) { - - // Get environment number from study. - String envSeqValue = studyDAO.getStudyByDbId(expUnit.getStudyDbId(), program).orElseThrow() - .getAdditionalInfo().get(BrAPIAdditionalInfoFields.ENVIRONMENT_NUMBER).getAsString(); - - for (int i=1; i<=request.getRepeatedMeasures(); i++) { - // Create subObsUnit and add to list. - subObsUnits.add( - createSubObservationUnit( - request.getName(), - Integer.toString(i), - program, - envSeqValue, - expUnit, - this.referenceSource, - subEntityDatasetId, - UUID.randomUUID() - ) - ); - } - } + final String datasetName = request.getName().trim(); + String lockKey = String.format("sub-entity-dataset:%s", experimentId); + try { + return lockService.withLock(lockKey, Duration.ofSeconds(30), Duration.ofMinutes(5), () -> { + log.debug("creating sub-entity dataset: \"{}\" for experiment: \"{}\" with: \"{}\" repeated measures.", datasetName, experimentId, request.getRepeatedMeasures()); + UUID subEntityDatasetId = UUID.randomUUID(); + List subObsUnits = new ArrayList<>(); + List createdObservationUnits = new ArrayList<>(); + boolean createdObservationLevel = false; + BrAPITrial experiment = getExperiment(program, experimentId); + DatasetMetadata topLevelDataset = DatasetUtil.getTopLevelDataset(experiment); + if (topLevelDataset == null) { + log.error("Experiment {} has no top level dataset.", experiment.getTrialDbId()); + throw new RuntimeException("Cannot create sub-entity dataset for experiment without top level dataset."); + } - List createdObservationUnits = observationUnitDAO.createBrAPIObservationUnits(subObsUnits, program.getId()); + List existingDatasets = DatasetUtil.datasetsFromJson(experiment.getAdditionalInfo().getAsJsonArray(BrAPIAdditionalInfoFields.DATASETS)); + if (existingDatasets.stream().anyMatch(dataset -> dataset.getName().equalsIgnoreCase(datasetName))) { + throw new HttpStatusException(HttpStatus.CONFLICT, "Dataset name already exists in this experiment"); + } - // Add the new dataset metadata to the datasets array in the trial's additionalInfo. - DatasetMetadata subEntityDatasetMetadata = DatasetMetadata.builder() - .id(subEntityDatasetId) - .name(request.getName()) - .level(DatasetLevel.SUB_OBS_UNIT) - .build(); - List datasets = DatasetUtil.datasetsFromJson(experiment.getAdditionalInfo().getAsJsonArray(BrAPIAdditionalInfoFields.DATASETS)); - datasets.add(subEntityDatasetMetadata); - experiment.getAdditionalInfo().add(BrAPIAdditionalInfoFields.DATASETS, DatasetUtil.jsonArrayFromDatasets(datasets)); - // Ask the DAO to persist the updated trial. - trialDAO.updateBrAPITrial(experiment.getTrialDbId(), experiment, program.getId()); + HttpResponse levelResponse = observationLevelDAO.createObservationLevelName(program, datasetName, DatasetLevel.SUB_OBS_UNIT); + if (levelResponse.getStatus() == HttpStatus.CONFLICT) { + throw new HttpStatusException(HttpStatus.CONFLICT, "Dataset name already exists in this experiment"); + } else if (levelResponse.getStatus().getCode() < 200 || levelResponse.getStatus().getCode() >= 300) { + throw new ApiException(levelResponse.getStatus().getCode(), "Unable to create observation level: " + levelResponse.getStatus().getReason()); + } + createdObservationLevel = true; + + try { + List expOUs = ouDAO.getObservationUnitsForDataset(topLevelDataset.getId().toString(), program); + for (BrAPIObservationUnit expUnit : expOUs) { + + String envSeqValue = studyDAO.getStudyByDbId(expUnit.getStudyDbId(), program).orElseThrow() + .getAdditionalInfo().get(BrAPIAdditionalInfoFields.ENVIRONMENT_NUMBER).getAsString(); + + for (int i=1; i<=request.getRepeatedMeasures(); i++) { + subObsUnits.add( + createSubObservationUnit( + datasetName, + Integer.toString(i), + program, + envSeqValue, + expUnit, + this.referenceSource, + subEntityDatasetId, + UUID.randomUUID() + ) + ); + } + } + + createdObservationUnits = observationUnitDAO.createBrAPIObservationUnits(subObsUnits, program.getId()); + + DatasetMetadata subEntityDatasetMetadata = DatasetMetadata.builder() + .id(subEntityDatasetId) + .name(datasetName) + .level(DatasetLevel.SUB_OBS_UNIT) + .build(); + + // Refresh experiment so we merge with the latest dataset metadata and avoid clobbering concurrent updates. + BrAPITrial latestExperiment = getExperiment(program, experimentId); + List datasets = DatasetUtil.datasetsFromJson(latestExperiment.getAdditionalInfo().getAsJsonArray(BrAPIAdditionalInfoFields.DATASETS)); + if (datasets.stream().anyMatch(dataset -> dataset.getName().equalsIgnoreCase(datasetName))) { + throw new HttpStatusException(HttpStatus.CONFLICT, "Dataset name already exists in this experiment"); + } + datasets.add(subEntityDatasetMetadata); + latestExperiment.getAdditionalInfo().add(BrAPIAdditionalInfoFields.DATASETS, DatasetUtil.jsonArrayFromDatasets(datasets)); + trialDAO.updateBrAPITrial(latestExperiment.getTrialDbId(), latestExperiment, program.getId()); + + return getDatasetData(program, experimentId, subEntityDatasetId, false); + } catch (Exception e) { + rollbackSubEntityDataset(program, datasetName, createdObservationUnits, createdObservationLevel); + throw e; + } + }); + } catch (TimeoutException e) { + throw new HttpStatusException(HttpStatus.SERVICE_UNAVAILABLE, "Dataset creation is busy, please retry"); + } catch (HttpStatusException e) { + throw e; + } catch (Exception e) { + if (e instanceof ApiException) { + throw (ApiException) e; + } + if (e instanceof DoesNotExistException) { + throw (DoesNotExistException) e; + } + throw new RuntimeException("Unexpected error creating sub-entity dataset", e); + } + } - // Return the new dataset. - return getDatasetData(program, experimentId, subEntityDatasetId, false); + private void rollbackSubEntityDataset(Program program, String datasetName, List createdObservationUnits, boolean createdObservationLevel) { + if (createdObservationUnits != null && !createdObservationUnits.isEmpty()) { + try { + List observationUnitDbIds = createdObservationUnits.stream() + .map(BrAPIObservationUnit::getObservationUnitDbId) + .filter(StringUtils::isNotBlank) + .collect(Collectors.toList()); + observationUnitDAO.deleteObservationUnits(observationUnitDbIds, program.getId()); + } catch (Exception err) { + log.warn("Failed to delete observation units for dataset {} during rollback", datasetName, err); + } + } + if (createdObservationLevel) { + try { + observationLevelDAO.deleteObservationLevelName(program, datasetName); + } catch (Exception err) { + log.warn("Failed to delete observation level {} during rollback", datasetName, err); + } + } } public BrAPIObservationUnit createSubObservationUnit( @@ -512,8 +583,6 @@ public BrAPIObservationUnit createSubObservationUnit( observationUnit.setTreatments(treatmentFactors); } - // Put level in additional info: keep this in case we decide to rename levels in future. - observationUnit.putAdditionalInfoItem(BrAPIAdditionalInfoFields.OBSERVATION_LEVEL, subEntityDatasetName); // Put RTK in additional info. JsonElement rtk = expUnit.getAdditionalInfo().get(BrAPIAdditionalInfoFields.RTK); if (rtk != null) { @@ -533,14 +602,14 @@ public BrAPIObservationUnit createSubObservationUnit( // ObservationLevel entry for Sub-Obs Unit. BrAPIObservationUnitLevelRelationship level = new BrAPIObservationUnitLevelRelationship(); - // TODO: consider removing toLowerCase() after BI-2219 is implemented. - level.setLevelName(subEntityDatasetName.toLowerCase()); + level.setLevelName(subEntityDatasetName); level.setLevelCode(Utilities.appendProgramKey(subUnitId, program.getKey(), seqVal)); level.setLevelOrder(DatasetLevel.SUB_OBS_UNIT.getValue()); position.setObservationLevel(level); // ObservationLevelRelationships. List levelRelationships = new ArrayList<>(); + levelRelationships.add(level); // ObservationLevelRelationships for block. BrAPIObservationUnitLevelRelationship expBlockLevel = expUnit.getObservationUnitPosition() .getObservationLevelRelationships().stream() @@ -565,8 +634,7 @@ public BrAPIObservationUnit createSubObservationUnit( } // ObservationLevelRelationships for top-level Exp Unit linking. BrAPIObservationUnitLevelRelationship expUnitLevel = new BrAPIObservationUnitLevelRelationship(); - // TODO: consider removing toLowerCase() after BI-2219 is implemented. - expUnitLevel.setLevelName(expUnit.getAdditionalInfo().get(BrAPIAdditionalInfoFields.OBSERVATION_LEVEL).getAsString().toLowerCase()); + expUnitLevel.setLevelName(requireObservationLevelName(expUnit)); String expUnitUUID = Utilities.getExternalReference(expUnit.getExternalReferences(), referenceSource, ExternalReferenceSource.OBSERVATION_UNITS).orElseThrow().getReferenceId(); expUnitLevel.setLevelCode(Utilities.appendProgramKey(expUnitUUID, program.getKey(), seqVal)); expUnitLevel.setLevelOrder(DatasetLevel.EXP_UNIT.getValue()); @@ -581,6 +649,29 @@ public BrAPIObservationUnit createSubObservationUnit( return observationUnit; } + private String getObservationLevelName(BrAPIObservationUnit observationUnit) { + if (observationUnit.getObservationUnitPosition() != null + && observationUnit.getObservationUnitPosition().getObservationLevel() != null + && StringUtils.isNotBlank(observationUnit.getObservationUnitPosition().getObservationLevel().getLevelName())) { + return observationUnit.getObservationUnitPosition().getObservationLevel().getLevelName(); + } + JsonObject additionalInfo = observationUnit.getAdditionalInfo(); + if (additionalInfo != null + && additionalInfo.has(BrAPIAdditionalInfoFields.OBSERVATION_LEVEL) + && !additionalInfo.get(BrAPIAdditionalInfoFields.OBSERVATION_LEVEL).isJsonNull()) { + return additionalInfo.get(BrAPIAdditionalInfoFields.OBSERVATION_LEVEL).getAsString(); + } + return null; + } + + private String requireObservationLevelName(BrAPIObservationUnit observationUnit) { + String levelName = getObservationLevelName(observationUnit); + if (StringUtils.isBlank(levelName)) { + throw new RuntimeException("Observation level not found for observation unit " + observationUnit.getObservationUnitDbId()); + } + return levelName; + } + private void addBrAPIObsToRecords( List dataset, BrAPITrial experiment, @@ -750,7 +841,7 @@ private Map createExportRow( row.put(ExperimentObservation.Columns.TEST_CHECK, testCheck); row.put(ExperimentObservation.Columns.EXP_TITLE, Utilities.removeProgramKey(experiment.getTrialName(), program.getKey())); row.put(ExperimentObservation.Columns.EXP_DESCRIPTION, experiment.getTrialDescription()); - row.put(ExperimentObservation.Columns.EXP_UNIT, ou.getAdditionalInfo().getAsJsonObject().get(BrAPIAdditionalInfoFields.OBSERVATION_LEVEL).getAsString()); + row.put(ExperimentObservation.Columns.EXP_UNIT, requireObservationLevelName(ou)); row.put(ExperimentObservation.Columns.EXP_TYPE, experiment.getAdditionalInfo().getAsJsonObject().get(BrAPIAdditionalInfoFields.EXPERIMENT_TYPE).getAsString()); row.put(ExperimentObservation.Columns.ENV, Utilities.removeProgramKeyAndUnknownAdditionalData(study.getStudyName(), program.getKey())); row.put(ExperimentObservation.Columns.ENV_LOCATION, Utilities.removeProgramKey(study.getLocationName(), program.getKey())); @@ -807,7 +898,7 @@ private Map createExportRow( } //Append observation level to obsUnitID - String observationLvl = ou.getAdditionalInfo().getAsJsonObject().get(BrAPIAdditionalInfoFields.OBSERVATION_LEVEL).getAsString(); + String observationLvl = requireObservationLevelName(ou); row.put(observationLvl + " " + OBSERVATION_UNIT_ID_SUFFIX, ouId); return row; diff --git a/src/main/java/org/breedinginsight/services/lock/DistributedLockService.java b/src/main/java/org/breedinginsight/services/lock/DistributedLockService.java new file mode 100644 index 000000000..06f141f33 --- /dev/null +++ b/src/main/java/org/breedinginsight/services/lock/DistributedLockService.java @@ -0,0 +1,61 @@ +package org.breedinginsight.services.lock; + +import lombok.extern.slf4j.Slf4j; +import org.redisson.api.RLock; +import org.redisson.api.RedissonClient; + +import javax.inject.Inject; +import javax.inject.Singleton; +import java.time.Duration; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * Small helper to provide a consistent pattern for distributed locks across the service layer. + */ +@Slf4j +@Singleton +public class DistributedLockService { + + private final RedissonClient redissonClient; + + @Inject + public DistributedLockService(RedissonClient redissonClient) { + this.redissonClient = redissonClient; + } + + /** + * Execute the given callback guarded by a distributed lock. + * + * @param lockKey the key for the distributed lock + * @param waitTime how long to wait to acquire the lock + * @param leaseTime how long before the lock automatically releases + * @param action the work to run while holding the lock + * @return result of the callback + * @throws TimeoutException if the lock cannot be acquired within the wait time + * @throws Exception bubbled up from the callback + */ + public T withLock(String lockKey, Duration waitTime, Duration leaseTime, Callable action) throws Exception { + RLock lock = redissonClient.getLock(lockKey); + boolean acquired = false; + try { + acquired = lock.tryLock(waitTime.toMillis(), leaseTime.toMillis(), TimeUnit.MILLISECONDS); + if (!acquired) { + throw new TimeoutException("Unable to acquire lock " + lockKey); + } + return action.call(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new TimeoutException("Interrupted while acquiring lock " + lockKey); + } finally { + if (acquired && lock.isHeldByCurrentThread()) { + try { + lock.unlock(); + } catch (Exception e) { + log.warn("Failed to release lock {}", lockKey, e); + } + } + } + } +} diff --git a/src/test/java/org/breedinginsight/brapi/v2/SubEntityDatasetLockIntegrationTest.java b/src/test/java/org/breedinginsight/brapi/v2/SubEntityDatasetLockIntegrationTest.java new file mode 100644 index 000000000..de48032bc --- /dev/null +++ b/src/test/java/org/breedinginsight/brapi/v2/SubEntityDatasetLockIntegrationTest.java @@ -0,0 +1,108 @@ +package org.breedinginsight.brapi.v2; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import io.micronaut.context.annotation.Property; +import io.micronaut.http.HttpRequest; +import io.micronaut.http.HttpResponse; +import io.micronaut.http.HttpStatus; +import io.micronaut.http.MediaType; +import io.micronaut.http.client.RxHttpClient; +import io.micronaut.http.client.annotation.Client; +import io.micronaut.test.extensions.junit5.annotation.MicronautTest; +import io.reactivex.Flowable; +import org.breedinginsight.BrAPITest; +import org.breedinginsight.model.Program; +import org.junit.jupiter.api.*; + +import javax.inject.Inject; +import java.time.OffsetDateTime; +import java.util.List; +import java.util.Objects; +import java.util.UUID; +import java.util.concurrent.*; +import java.util.stream.Collectors; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@MicronautTest +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public class SubEntityDatasetLockIntegrationTest extends BrAPITest { + + private Program program; + private String experimentId; + + @Inject + private BrAPITestUtils brAPITestUtils; + + @Inject + @Client("/${micronaut.bi.api.version}") + private RxHttpClient client; + + private final Gson gson = new GsonBuilder().registerTypeAdapter(OffsetDateTime.class, (json, type, context) -> OffsetDateTime.parse(json.getAsString())).create(); + + @BeforeAll + void setup() throws Exception { + var setup = brAPITestUtils.setupTestProgram(super.getBrapiDsl(), gson); + program = setup.getV1(); + experimentId = setup.getV2().get(0); + } + + @Test + void concurrentDatasetCreateReturnsSingleSuccessAndConflict() throws Exception { + // Use a fresh name to avoid interference with other runs + String datasetName = "LockTest-" + UUID.randomUUID(); + JsonObject request = new JsonObject(); + request.addProperty("name", datasetName); + request.addProperty("repeatedMeasures", 1); + + ExecutorService executor = Executors.newFixedThreadPool(2); + CountDownLatch start = new CountDownLatch(1); + + Callable call = () -> { + start.await(1, TimeUnit.SECONDS); + Flowable> response = client.exchange( + HttpRequest.POST(String.format("/programs/%s/experiments/%s/dataset", program.getId(), experimentId), request.toString()) + .contentType(MediaType.APPLICATION_JSON) + .bearerAuth("test-registered-user"), + String.class + ); + return response.blockingFirst().getStatus(); + }; + + Future first = executor.submit(call); + Future second = executor.submit(call); + start.countDown(); + + HttpStatus status1 = first.get(10, TimeUnit.SECONDS); + HttpStatus status2 = second.get(10, TimeUnit.SECONDS); + executor.shutdownNow(); + + List statuses = List.of(status1, status2); + assertTrue(statuses.contains(HttpStatus.OK)); + assertTrue(statuses.contains(HttpStatus.CONFLICT)); + + // Confirm only one dataset with that name exists + Flowable> datasetsCall = client.exchange( + HttpRequest.GET(String.format("/programs/%s/experiments/%s/datasets", program.getId(), experimentId)) + .bearerAuth("test-registered-user"), + String.class + ); + HttpResponse datasetsResponse = datasetsCall.blockingFirst(); + assertEquals(HttpStatus.OK, datasetsResponse.getStatus()); + var datasetsJson = JsonParser.parseString(Objects.requireNonNull(datasetsResponse.body())).getAsJsonObject() + .getAsJsonObject("result") + .getAsJsonArray("data"); + long matching = 0; + for (int i = 0; i < datasetsJson.size(); i++) { + String name = datasetsJson.get(i).getAsJsonObject().get("name").getAsString(); + if (name.equalsIgnoreCase(datasetName)) { + matching++; + } + } + assertEquals(1, matching); + } +} diff --git a/src/test/java/org/breedinginsight/services/lock/DistributedLockServiceTest.java b/src/test/java/org/breedinginsight/services/lock/DistributedLockServiceTest.java new file mode 100644 index 000000000..4a0522c70 --- /dev/null +++ b/src/test/java/org/breedinginsight/services/lock/DistributedLockServiceTest.java @@ -0,0 +1,57 @@ +package org.breedinginsight.services.lock; + +import org.breedinginsight.DatabaseTest; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class DistributedLockServiceTest extends DatabaseTest { + + private final ExecutorService executor = Executors.newFixedThreadPool(2); + + @AfterEach + void cleanup() { + executor.shutdownNow(); + } + + @Test + void secondLockAttemptTimesOutWhileFirstHolds() throws Exception { + DistributedLockService lockService = new DistributedLockService(super.getRedisConnection()); + String lockKey = "test-lock-key"; + + CountDownLatch firstAcquired = new CountDownLatch(1); + CountDownLatch releaseFirst = new CountDownLatch(1); + + Future firstCall = executor.submit(() -> + lockService.withLock(lockKey, Duration.ofMillis(500), Duration.ofSeconds(5), () -> { + firstAcquired.countDown(); + // keep the lock held until signaled + releaseFirst.await(2, TimeUnit.SECONDS); + return "first"; + }) + ); + + assertTrue(firstAcquired.await(1, TimeUnit.SECONDS), "First lock holder did not start in time"); + + assertThrows(TimeoutException.class, () -> + lockService.withLock(lockKey, Duration.ofMillis(100), Duration.ofSeconds(2), () -> "second") + ); + + releaseFirst.countDown(); + assertEquals("first", firstCall.get(2, TimeUnit.SECONDS)); + + String afterRelease = lockService.withLock(lockKey, Duration.ofMillis(500), Duration.ofSeconds(2), () -> "after-release"); + assertEquals("after-release", afterRelease); + } +}