From 1ea5cce3561d02182b5788dfafa115fbb0c611fb Mon Sep 17 00:00:00 2001 From: Rahul Goswami Date: Thu, 20 Nov 2025 03:11:45 -0500 Subject: [PATCH 01/24] Add merge policy to block older segments from participating in merges --- .../index/LatestVersionFilterMergePolicy.java | 52 +++++++++++++++++++ 1 file changed, 52 insertions(+) create mode 100644 solr/core/src/java/org/apache/solr/index/LatestVersionFilterMergePolicy.java diff --git a/solr/core/src/java/org/apache/solr/index/LatestVersionFilterMergePolicy.java b/solr/core/src/java/org/apache/solr/index/LatestVersionFilterMergePolicy.java new file mode 100644 index 00000000000..5063729d2d0 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/index/LatestVersionFilterMergePolicy.java @@ -0,0 +1,52 @@ +package org.apache.solr.index; + +import java.io.IOException; +import java.util.Map; +import org.apache.lucene.index.MergePolicy; +import org.apache.lucene.index.MergeTrigger; +import org.apache.lucene.index.SegmentCommitInfo; +import org.apache.lucene.index.SegmentInfos; +import org.apache.lucene.index.TieredMergePolicy; +import org.apache.lucene.util.Version; + +/** + * Only allows latest version segments to be considered for merges. That way a snapshot of older + * segments can remain consistent + */ +public class LatestVersionFilterMergePolicy extends MergePolicy { + MergePolicy delegatePolicy = new TieredMergePolicy(); + + @Override + public MergeSpecification findMerges( + MergeTrigger mergeTrigger, SegmentInfos infos, MergeContext mergeContext) throws IOException { + /*we don't want to remove from the original SegmentInfos, else the segments may not carry forward upon a commit. + That would be catastrophic. Hence we clone.*/ + SegmentInfos infosClone = infos.clone(); + infosClone.clear(); + for (SegmentCommitInfo info : infos) { + if (info.info.getMinVersion() != null + && info.info.getMinVersion().major == Version.LATEST.major) { + infosClone.add(info); + } + } + + return delegatePolicy.findMerges(mergeTrigger, infosClone, mergeContext); + } + + @Override + public MergeSpecification findForcedMerges( + SegmentInfos segmentInfos, + int maxSegmentCount, + Map segmentsToMerge, + MergeContext mergeContext) + throws IOException { + return delegatePolicy.findForcedMerges( + segmentInfos, maxSegmentCount, segmentsToMerge, mergeContext); + } + + @Override + public MergeSpecification findForcedDeletesMerges( + SegmentInfos segmentInfos, MergeContext mergeContext) throws IOException { + return delegatePolicy.findForcedDeletesMerges(segmentInfos, mergeContext); + } +} From cdc9978b2581318eb22c50b36744b6fb06f98f07 Mon Sep 17 00:00:00 2001 From: Rahul Goswami Date: Wed, 26 Nov 2025 01:27:41 -0500 Subject: [PATCH 02/24] extend FilterMergePolicy for delegation and modularize logic --- .../index/LatestVersionFilterMergePolicy.java | 69 ++++++++++++------- 1 file changed, 46 insertions(+), 23 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/index/LatestVersionFilterMergePolicy.java b/solr/core/src/java/org/apache/solr/index/LatestVersionFilterMergePolicy.java index 5063729d2d0..5841f99fb75 100644 --- a/solr/core/src/java/org/apache/solr/index/LatestVersionFilterMergePolicy.java +++ b/solr/core/src/java/org/apache/solr/index/LatestVersionFilterMergePolicy.java @@ -2,51 +2,74 @@ import java.io.IOException; import java.util.Map; +import org.apache.lucene.index.FilterMergePolicy; import org.apache.lucene.index.MergePolicy; import org.apache.lucene.index.MergeTrigger; import org.apache.lucene.index.SegmentCommitInfo; import org.apache.lucene.index.SegmentInfos; -import org.apache.lucene.index.TieredMergePolicy; import org.apache.lucene.util.Version; /** - * Only allows latest version segments to be considered for merges. That way a snapshot of older - * segments can remain consistent + * Prevents any older version segment (< {@link Version.LATEST}), either original or one derived as + * a result of merging with an older version segment, from being considered for merges. That way a + * snapshot of older segments remains consistent. This assists in upgrading to a future Lucene major + * version if existing documents are reindexed in the current version with this merge policy in + * place. */ -public class LatestVersionFilterMergePolicy extends MergePolicy { - MergePolicy delegatePolicy = new TieredMergePolicy(); +public class LatestVersionFilterMergePolicy extends FilterMergePolicy { + + public LatestVersionFilterMergePolicy(MergePolicy in) { + super(in); + } @Override public MergeSpecification findMerges( MergeTrigger mergeTrigger, SegmentInfos infos, MergeContext mergeContext) throws IOException { - /*we don't want to remove from the original SegmentInfos, else the segments may not carry forward upon a commit. - That would be catastrophic. Hence we clone.*/ - SegmentInfos infosClone = infos.clone(); - infosClone.clear(); - for (SegmentCommitInfo info : infos) { - if (info.info.getMinVersion() != null - && info.info.getMinVersion().major == Version.LATEST.major) { - infosClone.add(info); - } - } - - return delegatePolicy.findMerges(mergeTrigger, infosClone, mergeContext); + return in.findMerges(mergeTrigger, getFilteredInfosClone(infos), mergeContext); } @Override public MergeSpecification findForcedMerges( - SegmentInfos segmentInfos, + SegmentInfos infos, int maxSegmentCount, Map segmentsToMerge, MergeContext mergeContext) throws IOException { - return delegatePolicy.findForcedMerges( - segmentInfos, maxSegmentCount, segmentsToMerge, mergeContext); + return in.findForcedMerges( + getFilteredInfosClone(infos), maxSegmentCount, segmentsToMerge, mergeContext); + } + + @Override + public MergeSpecification findForcedDeletesMerges(SegmentInfos infos, MergeContext mergeContext) + throws IOException { + return in.findForcedDeletesMerges(getFilteredInfosClone(infos), mergeContext); } @Override - public MergeSpecification findForcedDeletesMerges( - SegmentInfos segmentInfos, MergeContext mergeContext) throws IOException { - return delegatePolicy.findForcedDeletesMerges(segmentInfos, mergeContext); + public MergeSpecification findFullFlushMerges( + MergeTrigger mergeTrigger, SegmentInfos infos, MergeContext mergeContext) throws IOException { + return in.findFullFlushMerges(mergeTrigger, getFilteredInfosClone(infos), mergeContext); + } + + private SegmentInfos getFilteredInfosClone(SegmentInfos infos) { + // We should not remove from the original SegmentInfos. Hence we clone. + SegmentInfos infosClone = infos.clone(); + infosClone.clear(); + for (SegmentCommitInfo info : infos) { + if (allowSegmentForMerge(info)) { + infosClone.add(info); + } + } + return infosClone; + } + + /** + * Determines if a SegmentCommitInfo should be part of the candidate set of segments that will be + * considered for merges. By default, we only allow LATEST version segments to participate in + * merges. + */ + protected boolean allowSegmentForMerge(SegmentCommitInfo info) { + return info.info.getMinVersion() != null + && info.info.getMinVersion().major == Version.LATEST.major; } } From 4be5ca18db577ff052d23af23529258ee22620b1 Mon Sep 17 00:00:00 2001 From: Rahul Goswami Date: Sun, 30 Nov 2025 01:13:39 -0500 Subject: [PATCH 03/24] First draft: Add UPGRADECOREINDEX as a CoreAdmin API --- .../model/UpgradeCoreIndexRequestBody.java | 13 + .../handler/admin/CoreAdminOperation.java | 4 +- .../handler/admin/UpgradeCoreIndexOp.java | 50 ++ .../handler/admin/api/UpgradeCoreIndex.java | 670 ++++++++++++++++++ .../index/LatestVersionFilterMergePolicy.java | 10 +- .../solr/common/params/CoreAdminParams.java | 3 +- 6 files changed, 743 insertions(+), 7 deletions(-) create mode 100644 solr/api/src/java/org/apache/solr/client/api/model/UpgradeCoreIndexRequestBody.java create mode 100644 solr/core/src/java/org/apache/solr/handler/admin/UpgradeCoreIndexOp.java create mode 100644 solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java diff --git a/solr/api/src/java/org/apache/solr/client/api/model/UpgradeCoreIndexRequestBody.java b/solr/api/src/java/org/apache/solr/client/api/model/UpgradeCoreIndexRequestBody.java new file mode 100644 index 00000000000..0635db57fcd --- /dev/null +++ b/solr/api/src/java/org/apache/solr/client/api/model/UpgradeCoreIndexRequestBody.java @@ -0,0 +1,13 @@ +package org.apache.solr.client.api.model; + +import com.fasterxml.jackson.annotation.JsonProperty; +import io.swagger.v3.oas.annotations.media.Schema; + +public class UpgradeCoreIndexRequestBody { + + @Schema(description = "Request ID to track this action which will be processed asynchronously.") + @JsonProperty + public String async; + + @JsonProperty public String updateChain; +} diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminOperation.java b/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminOperation.java index 0a15462f13f..3cd2bb141e7 100644 --- a/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminOperation.java +++ b/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminOperation.java @@ -38,6 +38,7 @@ import static org.apache.solr.common.params.CoreAdminParams.CoreAdminAction.STATUS; import static org.apache.solr.common.params.CoreAdminParams.CoreAdminAction.SWAP; import static org.apache.solr.common.params.CoreAdminParams.CoreAdminAction.UNLOAD; +import static org.apache.solr.common.params.CoreAdminParams.CoreAdminAction.UPGRADECOREINDEX; import static org.apache.solr.handler.admin.CoreAdminHandler.CallInfo; import java.lang.invoke.MethodHandles; @@ -256,7 +257,8 @@ public enum CoreAdminOperation implements CoreAdminOp { final ListCoreSnapshotsResponse response = coreSnapshotAPI.listSnapshots(coreName); V2ApiUtils.squashIntoSolrResponseWithoutHeader(it.rsp, response); - }); + }), + UPGRADECOREINDEX_OP(UPGRADECOREINDEX, new UpgradeCoreIndexOp()); final CoreAdminParams.CoreAdminAction action; final CoreAdminOp fun; diff --git a/solr/core/src/java/org/apache/solr/handler/admin/UpgradeCoreIndexOp.java b/solr/core/src/java/org/apache/solr/handler/admin/UpgradeCoreIndexOp.java new file mode 100644 index 00000000000..975fa7c40ef --- /dev/null +++ b/solr/core/src/java/org/apache/solr/handler/admin/UpgradeCoreIndexOp.java @@ -0,0 +1,50 @@ +package org.apache.solr.handler.admin; + +import org.apache.lucene.index.IndexWriter; +import org.apache.solr.client.api.model.UpgradeCoreIndexRequestBody; +import org.apache.solr.common.params.CoreAdminParams; +import org.apache.solr.common.params.SolrParams; +import org.apache.solr.common.params.UpdateParams; +import org.apache.solr.core.SolrCore; +import org.apache.solr.handler.admin.api.UpgradeCoreIndex; +import org.apache.solr.handler.api.V2ApiUtils; +import org.apache.solr.index.LatestVersionFilterMergePolicy; +import org.apache.solr.util.RefCounted; + +public class UpgradeCoreIndexOp implements CoreAdminHandler.CoreAdminOp { + @Override + public boolean isExpensive() { + return true; + } + + @Override + public void execute(CoreAdminHandler.CallInfo it) throws Exception { + SolrParams params = it.req.getParams(); + String cname = params.required().get(CoreAdminParams.CORE); + final var requestBody = new UpgradeCoreIndexRequestBody(); + requestBody.updateChain = params.get(UpdateParams.UPDATE_CHAIN); + RefCounted iwRef = null; + SolrCore core = it.req.getCore(); + try { + if (iwRef == null) { + iwRef = core.getSolrCoreState().getIndexWriter(core); + } + // set LatestVersionFilterMergePolicy as the merge policy to prevent older segments from + // participating in merges while we reindex + if (iwRef != null) { + IndexWriter iw = iwRef.get(); + iw.getConfig() + .setMergePolicy(new LatestVersionFilterMergePolicy(iw.getConfig().getMergePolicy())); + } + UpgradeCoreIndex upgradeCoreIndexApi = + new UpgradeCoreIndex( + it.handler.coreContainer, it.handler.coreAdminAsyncTracker, it.req, it.rsp); + final var response = upgradeCoreIndexApi.upgradeCoreIndex(cname, requestBody); + V2ApiUtils.squashIntoSolrResponseWithoutHeader(it.rsp, response); + } finally { + if (iwRef != null) { + iwRef.decref(); + } + } + } +} diff --git a/solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java b/solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java new file mode 100644 index 00000000000..d4e095bc4fd --- /dev/null +++ b/solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java @@ -0,0 +1,670 @@ +package org.apache.solr.handler.admin.api; + +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.nio.file.Paths; +import java.time.Duration; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.util.Collection; +import java.util.Date; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import org.apache.commons.lang3.time.DurationFormatUtils; +import org.apache.lucene.document.Document; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.FilterLeafReader; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexableField; +import org.apache.lucene.index.LeafReader; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.SegmentCommitInfo; +import org.apache.lucene.index.SegmentInfos; +import org.apache.lucene.index.SegmentReader; +import org.apache.lucene.index.StoredFields; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FSDirectory; +import org.apache.lucene.util.Bits; +import org.apache.lucene.util.Version; +import org.apache.solr.client.api.model.SolrJerseyResponse; +import org.apache.solr.client.api.model.UpgradeCoreIndexRequestBody; +import org.apache.solr.client.solrj.request.UpdateRequest; +import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.common.params.UpdateParams; +import org.apache.solr.common.util.NamedList; +import org.apache.solr.core.CoreContainer; +import org.apache.solr.core.SolrCore; +import org.apache.solr.handler.RequestHandlerBase; +import org.apache.solr.handler.admin.CoreAdminHandler; +import org.apache.solr.request.LocalSolrQueryRequest; +import org.apache.solr.request.SolrQueryRequest; +import org.apache.solr.request.SolrRequestHandler; +import org.apache.solr.response.SolrQueryResponse; +import org.apache.solr.schema.DateValueFieldType; +import org.apache.solr.schema.IndexSchema; +import org.apache.solr.schema.SchemaField; +import org.apache.solr.search.DocValuesIteratorCache; +import org.apache.solr.search.SolrDocumentFetcher; +import org.apache.solr.search.SolrIndexSearcher; +import org.apache.solr.update.AddUpdateCommand; +import org.apache.solr.update.processor.UpdateRequestProcessor; +import org.apache.solr.update.processor.UpdateRequestProcessorChain; +import org.apache.solr.util.RefCounted; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class UpgradeCoreIndex extends CoreAdminAPIBase { + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + /* + * The re-indexing status at any point of time for a particular core. + * DEFAULT - This is the default status, meaning it is yet to be processed and checked if the version is LATEST + * for this core + * REINDEXING_ACTIVE - This is set at the start of the re-indexing operation + * PROCESSED - This is set at the end of the re-indexing operation if there are no errors + * ERROR - This is set if there is any error in any segment. This core will be retried CORE_ERROR_RETRIES number + * of + * times + * CORRECTVERSION - This is set if the core is already at the correct version + */ + public enum CoreReindexingStatus { + DEFAULT, + REINDEXING_ACTIVE, + REINDEXING_PAUSED, + PROCESSED, + ERROR, + CORRECTVERSION; + } + + /* + * The state that a single ReindexingThread would be in. This is set to START_REINDEXING by CPUMonitorTask + * thread + * START_REINDEXING - CPUMonitorTask checks if current CPU usage is below given threshold and sets this state + * WAITING - CPUMonitorTask checks if the CPU usage is above given threshold and sets this state to put + * the CVReindexingTask thread in a waiting state. Note that ReindexingTask thread run() will still be checked + * in this case. This can also be set when all cores have processed and there are no pending cores. + * STOP_REINDEXING - CPUMonitorTask checks if all cores are processed with the status between {CORRECTVERSION, + * ERROR} + * and if all cores are processed then sets it to STOP_REINDEXING + * + */ + public enum ReindexingThreadState { + START_REINDEXING, + STOP_REINDEXING, + WAITING; + } + + private static final long DEFAULT_EXECUTION_INTERVAL_MS = 60000; + + // private static final String reindexingStatusFileName = + // String.format("reindexing_status_%dx.csv", Version.LATEST.major); + private static final int SEGMENT_ERROR_RETRIES = 3; + private static final long SLEEP_TIME_BEFORE_AFTER_COMMIT_MS = 10000; + private static SolrInputDocument lastDoc; + private static final int RETRY_COUNT_FOR_SEGMENT_DELETION = 5; + private static final long SLEEP_TIME_SEGMENT_DELETION_MS = 60000; + private static final LocalDateTime defaultDtm = LocalDateTime.ofEpochSecond(0, 0, ZoneOffset.UTC); + private static boolean fetchNumDocs = false; + + private static final DateTimeFormatter formatter = + DateTimeFormatter.ofPattern("dd-MM-yyyy HH:mm:ss"); + + public UpgradeCoreIndex( + CoreContainer coreContainer, + CoreAdminHandler.CoreAdminAsyncTracker coreAdminAsyncTracker, + SolrQueryRequest req, + SolrQueryResponse rsp) { + super(coreContainer, coreAdminAsyncTracker, req, rsp); + } + + @Override + public boolean isExpensive() { + return true; + } + + public SolrJerseyResponse upgradeCoreIndex( + String coreName, UpgradeCoreIndexRequestBody requestBody) throws Exception { + ensureRequiredParameterProvided("coreName", coreName); + SolrJerseyResponse response = instantiateJerseyResponse(SolrJerseyResponse.class); + + return handlePotentiallyAsynchronousTask( + response, + coreName, + requestBody.async, + "upgrade-index", + () -> { + try (SolrCore core = coreContainer.getCore(coreName)) { + + log.warn("Processing core: {}", core.getName()); + CoreReindexingStatus coreRxStatus = CoreReindexingStatus.REINDEXING_ACTIVE; + + String indexDir = core.getIndexDir(); + + log.info("Starting to process core: {}", coreName); + + RefCounted ssearcherRef = core.getSearcher(); + List leafContexts = + ssearcherRef.get().getTopReaderContext().leaves(); + DocValuesIteratorCache dvICache = new DocValuesIteratorCache(ssearcherRef.get()); + + Map segmentsToUpgrade = getSegmentsToUpgrade(indexDir); + + log.info("Segments to upgrade: {}", segmentsToUpgrade.toString()); + + setLastDoc(null); + + UpdateRequestProcessorChain updateProcessorChain = getUpdateProcessorChain(core); + + try { + + for (int segmentIndex = 0, c = leafContexts.size(); + segmentIndex < c; + segmentIndex++) { + LeafReaderContext lrc = leafContexts.get(segmentIndex); + LeafReader leafReader = lrc.reader(); + leafReader = FilterLeafReader.unwrap(leafReader); + log.debug( + "LeafReader hashcode: {}, getCreatedVersionMajor: {}, getMinVersion:{} ", + leafReader.hashCode(), + leafReader.getMetaData().createdVersionMajor(), + leafReader.getMetaData().minVersion()); + + SegmentReader segmentReader = (SegmentReader) leafReader; + String currentSegmentName = segmentReader.getSegmentName(); + + if (segmentsToUpgrade.containsKey(currentSegmentName)) { + boolean segmentError = false; + LocalDateTime segmentRxStartTime = LocalDateTime.now(); + LocalDateTime segmentRxStopTime = LocalDateTime.MAX; + + for (int i = 0; i < SEGMENT_ERROR_RETRIES; i++) { + // retrying segment; I anticipate throttling to be the main reason in most + // cases + // hence the sleep + if (i > 0) { + Thread.sleep(5 * 60 * 1000); // 5 minutes + } + + log.info( + "Start processSegment run: {}, segment: {} at {}", + i, + segmentReader.getSegmentName(), + formatter.format(segmentRxStartTime)); + + segmentError = + processSegment( + segmentReader, + leafContexts, + segmentIndex, + updateProcessorChain, + core, + dvICache); + + segmentRxStopTime = LocalDateTime.now(); + log.info( + "End processSegment run: {}, segment: {} at {}", + i, + segmentReader.getSegmentName(), + formatter.format(segmentRxStopTime)); + // segmentError = true + if (segmentError) { + coreRxStatus = CoreReindexingStatus.ERROR; + log.error( + "processSegment returned : {} for segment : {}", + segmentError, + segmentReader.getSegmentName()); + } + } + + log.info( + "Segment: {} Elapsed time: {}, start time: {}, stop time: {}", + segmentReader.getSegmentName(), + DurationFormatUtils.formatDuration( + Duration.between(segmentRxStartTime, segmentRxStopTime).toMillis(), + "**H:mm:ss**", + true), + formatter.format(segmentRxStartTime), + formatter.format(segmentRxStopTime)); + + // We found and processed the correct segment for this iteration. + // No need to look at other leaves in the immediate outer for loop + break; + } + } + } catch (Exception e) { + log.error("Error while processing core: {}, exception: {}", coreName, e.toString()); + coreRxStatus = CoreReindexingStatus.ERROR; + } + + try { + RefCounted iwRef = core.getSolrCoreState().getIndexWriter(null); + if (iwRef != null) { + IndexWriter iw = iwRef.get(); + try { + if (iw != null) { + iw.commit(); + } else { + log.warn("IndexWriter for core {} is null", core.getName()); + } + } finally { + iwRef.decref(); + } + } else { + log.warn("IWRef for core {} is null", core.getName()); + } + } catch (IOException ioEx) { + + } + + // important to decrement searcher ref count after use since we obtained it via the + // SolrCore.getSearcher() method + ssearcherRef.decref(); + + // IF coreRxStatus == CoreReindexingStatus.REINDEXING_PAUSED at this point then most + // likely it + // reached here + // by breaking out of segment processing. So we are going straight to setting the state + // and + // publishing to reindexing_status.csv + if (coreRxStatus != CoreReindexingStatus.REINDEXING_PAUSED) { + try { + if (coreRxStatus == CoreReindexingStatus.ERROR) { + log.error("Core CoreReindexingStatus returned error, not calling commit"); + } else { + Boolean validationResult = false; + for (int i = 0; + (i < RETRY_COUNT_FOR_SEGMENT_DELETION) + && (validationResult != null && !validationResult); + i++) { + + doCommit(core); + Thread.sleep(SLEEP_TIME_BEFORE_AFTER_COMMIT_MS); + + validationResult = validateSegmentsUpdated(core); + log.warn( + "validateSegmentsUpdated() returned: {} for core: {}, sleeping for {}ms before calling commit...", + validationResult, + coreName, + SLEEP_TIME_SEGMENT_DELETION_MS); + Thread.sleep(SLEEP_TIME_SEGMENT_DELETION_MS); + } + if ((validationResult == null) + || (validationResult != null && !validationResult)) { + log.error( + "Validation failed for core: {}, not increasing indexCreatedVersionMajor", + validationResult, + coreName); + coreRxStatus = CoreReindexingStatus.ERROR; + } else { + + doCommit(core); + Thread.sleep(SLEEP_TIME_BEFORE_AFTER_COMMIT_MS); + + int indexCreatedVersionMajorAfterCommit = getIndexCreatedVersionMajor(core); + log.info( + "Post processing coreName: {}, indexCreatedVersionMajorAfterCommit: {}", + coreName, + indexCreatedVersionMajorAfterCommit); + if (indexCreatedVersionMajorAfterCommit == Version.LATEST.major) { + log.info( + "Core: {} index version updated successfully to {}", + coreName, + Version.LATEST); + coreRxStatus = CoreReindexingStatus.PROCESSED; + } else { + log.error( + "indexCreatedVersionMajorAfterCommit is {}", + indexCreatedVersionMajorAfterCommit); + coreRxStatus = CoreReindexingStatus.ERROR; + } + } + } + } catch (Exception e) { + log.error("Exception in processCore: {}", e.toString()); + } + } + } + return null; + }); + } + + private int getIndexCreatedVersionMajor(SolrCore core) { + int indexCreatedVersionMajor = 0; + try (FSDirectory dir = FSDirectory.open(Paths.get(core.getIndexDir()))) { + SegmentInfos sis = SegmentInfos.readLatestCommit(dir); + indexCreatedVersionMajor = sis.getIndexCreatedVersionMajor(); + } catch (Exception e) { + log.error( + "Error while opening segmentInfos for core: {}, exception: {}", + core.getName(), + e.toString()); + } + + return indexCreatedVersionMajor; + } + + @SuppressWarnings({"rawtypes"}) + private UpdateRequestProcessorChain getUpdateProcessorChain(SolrCore core) { + + SolrRequestHandler reqHandler = core.getRequestHandler("/update"); + NamedList initArgs = ((RequestHandlerBase) reqHandler).getInitArgs(); + + String updateChainName = null; + Object defaults = initArgs.get("defaults"); + if (defaults != null && defaults instanceof NamedList) { + updateChainName = (String) (((NamedList) defaults).get(UpdateParams.UPDATE_CHAIN)); + } + if (updateChainName == null) { + Object invariants = initArgs.get("invariants"); + if (invariants != null && invariants instanceof NamedList) { + updateChainName = (String) (((NamedList) invariants).get(UpdateParams.UPDATE_CHAIN)); + } + } + + return core.getUpdateProcessingChain(updateChainName); + } + + /* + * returns: + * + * null: For any error or if there is at least one older version segment present in the index + * false: For any 0 older version segment present in the index having 0 numDocs + * true: If all segments are LATEST version + * + */ + private Boolean validateSegmentsUpdated(SolrCore core) { + Boolean segmentsUpdated = null; + try (FSDirectory dir = FSDirectory.open(Paths.get(core.getIndexDir())); + IndexReader reader = DirectoryReader.open(dir)) { + + List leaves = reader.leaves(); + if (leaves == null || leaves.isEmpty()) { + // no segments to process/validate + return true; + } + segmentsUpdated = true; + for (LeafReaderContext lrc : leaves) { + LeafReader leafReader = lrc.reader(); + leafReader = FilterLeafReader.unwrap(leafReader); + if (leafReader instanceof SegmentReader) { + SegmentReader segmentReader = (SegmentReader) leafReader; + SegmentCommitInfo si = segmentReader.getSegmentInfo(); + Version segMinVersion = si.info.getMinVersion(); + if (segMinVersion == null || segMinVersion.major != Version.LATEST.major) { + log.warn( + "validateSegmentsUpdated(): Core: {}, Segment {} is still at minVersion: {} and is not updated to the latest version {}", + core.getName(), + si.info.name, + (segMinVersion == null ? 6 : segMinVersion.major), + Version.LATEST.major); + segmentsUpdated = null; + // Since we could have 1 0-numDoc segment and multiple non-zero numDoc + // older version segments, we break only if a 0-numDoc segment is found + if (segmentReader.numDocs() == 0) { + segmentsUpdated = false; + break; + } + } + } + } + } catch (Exception e) { + log.error( + "Error while opening segmentInfos for core: {}, exception: {}", + core.getName(), + e.toString()); + segmentsUpdated = null; + } + return segmentsUpdated; + } + + private void doCommit(SolrCore core) { + try { + SolrInputDocument dummyDoc = null; + SolrInputDocument lastDoc = getLastDoc(); + if (lastDoc == null) { + // set dummy doc for commit to take effect especially in case of 0-doc cores + dummyDoc = getDummyDoc(core); + lastDoc = dummyDoc; + } + + UpdateRequest updateReq = new UpdateRequest(); + updateReq.add(lastDoc); + if (log.isDebugEnabled()) { + log.debug("Last solr Doc keySet: {}", lastDoc.keySet().toString()); + } + ModifiableSolrParams msp = new ModifiableSolrParams(); + + msp.add("commit", "true"); + LocalSolrQueryRequest solrReq; + solrReq = getLocalUpdateReq(updateReq, core, msp); + updateReq.getDocumentsMap().clear(); + if (log.isDebugEnabled()) { + log.debug( + "Calling commit. Solr params: {}, CvReindexingTask.getLastDoc(): {}", + msp.toString(), + lastDoc.toString()); + } + doLocalUpdateReq(solrReq, core); + + if (dummyDoc != null) { + deleteDummyDocAndCommit( + core, + (String) dummyDoc.getFieldValue(core.getLatestSchema().getUniqueKeyField().getName())); + } + } catch (Exception e) { + log.error( + "Error while sending update request to advance index created version {}", e.toString()); + } + } + + private void deleteDummyDocAndCommit(SolrCore core, String dummyContentId) throws Exception { + UpdateRequest updateReq = new UpdateRequest(); + updateReq.deleteById(dummyContentId); + log.debug("Deleting dummy doc with id: {}", dummyContentId); + ModifiableSolrParams msp = new ModifiableSolrParams(); + + msp.add("commit", "true"); + LocalSolrQueryRequest solrReq; + try { + solrReq = getLocalUpdateReq(updateReq, core, msp); + doLocalUpdateReq(solrReq, core); + } catch (Exception e) { + log.error("Error deleting dummy doc"); + throw e; + } + } + + public LocalSolrQueryRequest getLocalUpdateReq( + UpdateRequest updateReq, SolrCore core, ModifiableSolrParams msp) throws IOException { + LocalSolrQueryRequest solrReq = new LocalSolrQueryRequest(core, msp); + solrReq.setContentStreams(updateReq.getContentStreams()); + return solrReq; + } + + public static void doLocalUpdateReq(LocalSolrQueryRequest solrReq, SolrCore core) { + try { + SolrQueryResponse resp = new SolrQueryResponse(); + core.getRequestHandler("/update").handleRequest(solrReq, resp); + if (resp.getException() != null) { + log.error("doLocalUpdateReq error: {}", resp.getException().toString()); + } + } catch (Exception e) { + log.error("Exception in doLocalUpdateReq: {}", e.toString()); + } finally { + solrReq.close(); + } + } + + private SolrInputDocument getDummyDoc(SolrCore core) { + SolrInputDocument dummyDoc = new SolrInputDocument(); + String dummyContentId = "cvrx-dummydoc" + UUID.randomUUID().toString(); + String uniqeKeyFieldName = core.getLatestSchema().getUniqueKeyField().getName(); + dummyDoc.addField(uniqeKeyFieldName, dummyContentId); + Collection requiredFields = core.getLatestSchema().getRequiredFields(); + + for (SchemaField sf : requiredFields) { + if (sf.getName().equals(uniqeKeyFieldName) || sf.getDefaultValue() != null) { + continue; + } + if (sf.getType() instanceof DateValueFieldType) { + dummyDoc.addField(sf.getName(), new Date()); + } else { + dummyDoc.addField(sf.getName(), "1"); + } + } + return dummyDoc; + } + + private static void setLastDoc(SolrInputDocument solrDoc) { + lastDoc = solrDoc; + } + + private static Map getSegmentsToUpgrade(String indexDir) { + Map segmentsToUpgrade = new LinkedHashMap<>(); + try (Directory dir = FSDirectory.open(Paths.get(indexDir)); + IndexReader reader = DirectoryReader.open(dir)) { + for (LeafReaderContext lrc : reader.leaves()) { + LeafReader leafReader = lrc.reader(); + leafReader = FilterLeafReader.unwrap(leafReader); + + SegmentReader segmentReader = (SegmentReader) leafReader; + Version segmentMinVersion = segmentReader.getSegmentInfo().info.getMinVersion(); + if (segmentMinVersion == null || segmentMinVersion.major < Version.LATEST.major) { + segmentsToUpgrade.put( + segmentReader.getSegmentName(), segmentReader.getSegmentInfo().sizeInBytes()); + } else { + log.debug( + "Segment: {} shall be skipped since minVersion already at {}", + segmentReader.getSegmentName(), + segmentReader.getSegmentInfo().info.getMinVersion()); + } + } + } catch (Exception e) { + log.error("Exception while gettting segments to be uploaded from indexDir: {}", e.toString()); + } + return segmentsToUpgrade; + } + + private boolean processSegment( + SegmentReader segmentReader, + List leafContexts, + int segmentIndex, + UpdateRequestProcessorChain processorChain, + SolrCore core, + DocValuesIteratorCache dvICache) { + + boolean segmentError = true; + int numDocsProcessed = 0; + int numDocsAccum = 0; + String coreName = core.getName(); + IndexSchema indexSchema = core.getLatestSchema(); + Bits bits = segmentReader.getLiveDocs(); + SolrInputDocument solrDoc = null; + UpdateRequestProcessor processor = null; + RefCounted searcherRef = core.getSearcher(); + SolrDocumentFetcher docFetcher = searcherRef.get().getDocFetcher(); + try { + Set fields = docFetcher.getNonStoredDVsWithoutCopyTargets(); + LocalSolrQueryRequest solrRequest = + new LocalSolrQueryRequest(core, new ModifiableSolrParams()); + + SolrQueryResponse rsp = new SolrQueryResponse(); + processor = processorChain.createProcessor(solrRequest, rsp); + StoredFields storedFields = segmentReader.storedFields(); + for (int luceneDocId = 0; luceneDocId < segmentReader.maxDoc(); luceneDocId++) { + if (bits != null && !bits.get(luceneDocId)) { + continue; + } + + Document doc = storedFields.document(luceneDocId); + solrDoc = toSolrInputDocument(doc, indexSchema); + + docFetcher.decorateDocValueFields( + solrDoc, leafContexts.get(segmentIndex).docBase + luceneDocId, fields, dvICache); + solrDoc.removeField("_version_"); + AddUpdateCommand currDocCmd = new AddUpdateCommand(solrRequest); + currDocCmd.solrDoc = solrDoc; + processor.processAdd(currDocCmd); + numDocsProcessed++; + numDocsAccum++; + if (fetchNumDocs) { + numDocsAccum = 0; + } + } + } catch (IOException e) { + log.error("Error in CvReindexingTask process() : {}", e.toString()); + segmentError = true; + } finally { + searcherRef.decref(); + if (processor != null) { + try { + processor.finish(); + } catch (Exception e) { + log.error("Exception while doing finish processor.finish() : {}", e.toString()); + segmentError = true; + } finally { + try { + processor.close(); + } catch (IOException e) { + log.error("Exception while closing processor: {}", e.toString()); + segmentError = true; + } + } + } + } + if (solrDoc != null) { + setLastDoc(new SolrInputDocument(solrDoc)); + getLastDoc().removeField("_version_"); + } + segmentError = false; + log.info( + "End processing segment : {}, core: {} docs processed: {}", + segmentReader.getSegmentName(), + coreName, + numDocsProcessed); + + return segmentError; + } + + public SolrInputDocument getLastDoc() { + return lastDoc; + } + + /* + * Convert a lucene Document to a SolrInputDocument + */ + protected SolrInputDocument toSolrInputDocument( + org.apache.lucene.document.Document doc, IndexSchema schema) { + SolrInputDocument out = new SolrInputDocument(); + for (IndexableField f : doc.getFields()) { + String fname = f.name(); + SchemaField sf = schema.getFieldOrNull(f.name()); + Object val = null; + if (sf != null) { + if ((!sf.hasDocValues() && !sf.stored()) || schema.isCopyFieldTarget(sf)) { + continue; + } + val = sf.getType().toObject(f); + } else { + val = f.stringValue(); + if (val == null) { + val = f.numericValue(); + } + if (val == null) { + val = f.binaryValue(); + } + if (val == null) { + val = f; + } + } + out.addField(fname, val); + } + return out; + } +} diff --git a/solr/core/src/java/org/apache/solr/index/LatestVersionFilterMergePolicy.java b/solr/core/src/java/org/apache/solr/index/LatestVersionFilterMergePolicy.java index 5841f99fb75..412606f3220 100644 --- a/solr/core/src/java/org/apache/solr/index/LatestVersionFilterMergePolicy.java +++ b/solr/core/src/java/org/apache/solr/index/LatestVersionFilterMergePolicy.java @@ -10,11 +10,11 @@ import org.apache.lucene.util.Version; /** - * Prevents any older version segment (< {@link Version.LATEST}), either original or one derived as - * a result of merging with an older version segment, from being considered for merges. That way a - * snapshot of older segments remains consistent. This assists in upgrading to a future Lucene major - * version if existing documents are reindexed in the current version with this merge policy in - * place. + * Prevents any older version segment (i.e. older than latest lucene major version), either original + * or one derived as a result of merging with an older version segment, from being considered for + * merges. That way a snapshot of older segments remains consistent. This assists in upgrading to a + * future Lucene major version if existing documents are reindexed in the current version with this + * merge policy in place. */ public class LatestVersionFilterMergePolicy extends FilterMergePolicy { diff --git a/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java b/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java index ea220642124..ad41867cc31 100644 --- a/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java +++ b/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java @@ -178,7 +178,8 @@ public enum CoreAdminAction { INSTALLCOREDATA, CREATESNAPSHOT, DELETESNAPSHOT, - LISTSNAPSHOTS; + LISTSNAPSHOTS, + UPGRADECOREINDEX; public final boolean isRead; From 052cda6a438bff0c23888d73b85eac1a4bf4569f Mon Sep 17 00:00:00 2001 From: Rahul Goswami Date: Sun, 30 Nov 2025 01:55:00 -0500 Subject: [PATCH 04/24] fix incorrect early-exit while iterating over segments --- .../org/apache/solr/handler/admin/api/UpgradeCoreIndex.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java b/solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java index d4e095bc4fd..c4a50ac72b0 100644 --- a/solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java +++ b/solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java @@ -231,10 +231,6 @@ public SolrJerseyResponse upgradeCoreIndex( true), formatter.format(segmentRxStartTime), formatter.format(segmentRxStopTime)); - - // We found and processed the correct segment for this iteration. - // No need to look at other leaves in the immediate outer for loop - break; } } } catch (Exception e) { From c9e3f3ed974dc6e64b8901a5190d88d9d51d60b1 Mon Sep 17 00:00:00 2001 From: Rahul Goswami Date: Thu, 11 Dec 2025 23:57:55 -0500 Subject: [PATCH 05/24] restore original merge policy; injectable factory for test stubs; option for custom update.chain --- .../handler/admin/UpgradeCoreIndexOp.java | 34 ++++++++-- .../handler/admin/api/UpgradeCoreIndex.java | 62 ++++++++++++------- 2 files changed, 68 insertions(+), 28 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/handler/admin/UpgradeCoreIndexOp.java b/solr/core/src/java/org/apache/solr/handler/admin/UpgradeCoreIndexOp.java index 975fa7c40ef..b29abdaa14c 100644 --- a/solr/core/src/java/org/apache/solr/handler/admin/UpgradeCoreIndexOp.java +++ b/solr/core/src/java/org/apache/solr/handler/admin/UpgradeCoreIndexOp.java @@ -1,17 +1,32 @@ package org.apache.solr.handler.admin; import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.MergePolicy; import org.apache.solr.client.api.model.UpgradeCoreIndexRequestBody; import org.apache.solr.common.params.CoreAdminParams; import org.apache.solr.common.params.SolrParams; import org.apache.solr.common.params.UpdateParams; +import org.apache.solr.core.CoreContainer; import org.apache.solr.core.SolrCore; import org.apache.solr.handler.admin.api.UpgradeCoreIndex; import org.apache.solr.handler.api.V2ApiUtils; import org.apache.solr.index.LatestVersionFilterMergePolicy; +import org.apache.solr.request.SolrQueryRequest; +import org.apache.solr.response.SolrQueryResponse; import org.apache.solr.util.RefCounted; public class UpgradeCoreIndexOp implements CoreAdminHandler.CoreAdminOp { + @FunctionalInterface + public interface UpgradeCoreIndexFactory { + UpgradeCoreIndex create( + CoreContainer coreContainer, + CoreAdminHandler.CoreAdminAsyncTracker coreAdminAsyncTracker, + SolrQueryRequest req, + SolrQueryResponse rsp); + } + + static UpgradeCoreIndexFactory UPGRADE_CORE_INDEX_FACTORY = UpgradeCoreIndex::new; + @Override public boolean isExpensive() { return true; @@ -24,24 +39,33 @@ public void execute(CoreAdminHandler.CallInfo it) throws Exception { final var requestBody = new UpgradeCoreIndexRequestBody(); requestBody.updateChain = params.get(UpdateParams.UPDATE_CHAIN); RefCounted iwRef = null; + IndexWriter iw = null; + MergePolicy originalMergePolicy = null; SolrCore core = it.req.getCore(); try { - if (iwRef == null) { - iwRef = core.getSolrCoreState().getIndexWriter(core); - } + iwRef = core.getSolrCoreState().getIndexWriter(core); + // set LatestVersionFilterMergePolicy as the merge policy to prevent older segments from // participating in merges while we reindex if (iwRef != null) { - IndexWriter iw = iwRef.get(); + iw = iwRef.get(); + } + if (iw != null) { + originalMergePolicy = iw.getConfig().getMergePolicy(); iw.getConfig() .setMergePolicy(new LatestVersionFilterMergePolicy(iw.getConfig().getMergePolicy())); } UpgradeCoreIndex upgradeCoreIndexApi = - new UpgradeCoreIndex( + UPGRADE_CORE_INDEX_FACTORY.create( it.handler.coreContainer, it.handler.coreAdminAsyncTracker, it.req, it.rsp); final var response = upgradeCoreIndexApi.upgradeCoreIndex(cname, requestBody); V2ApiUtils.squashIntoSolrResponseWithoutHeader(it.rsp, response); } finally { + // reset original merge policy + if (iw != null && originalMergePolicy != null) { + iw.getConfig().setMergePolicy(originalMergePolicy); + } + if (iwRef != null) { iwRef.decref(); } diff --git a/solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java b/solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java index c4a50ac72b0..634257dab37 100644 --- a/solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java +++ b/solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java @@ -106,15 +106,15 @@ public enum ReindexingThreadState { // String.format("reindexing_status_%dx.csv", Version.LATEST.major); private static final int SEGMENT_ERROR_RETRIES = 3; private static final long SLEEP_TIME_BEFORE_AFTER_COMMIT_MS = 10000; - private static SolrInputDocument lastDoc; private static final int RETRY_COUNT_FOR_SEGMENT_DELETION = 5; private static final long SLEEP_TIME_SEGMENT_DELETION_MS = 60000; private static final LocalDateTime defaultDtm = LocalDateTime.ofEpochSecond(0, 0, ZoneOffset.UTC); - private static boolean fetchNumDocs = false; private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("dd-MM-yyyy HH:mm:ss"); + private SolrInputDocument lastDoc; + public UpgradeCoreIndex( CoreContainer coreContainer, CoreAdminHandler.CoreAdminAsyncTracker coreAdminAsyncTracker, @@ -159,7 +159,8 @@ public SolrJerseyResponse upgradeCoreIndex( setLastDoc(null); - UpdateRequestProcessorChain updateProcessorChain = getUpdateProcessorChain(core); + UpdateRequestProcessorChain updateProcessorChain = + getUpdateProcessorChain(core, requestBody.updateChain); try { @@ -179,7 +180,7 @@ public SolrJerseyResponse upgradeCoreIndex( String currentSegmentName = segmentReader.getSegmentName(); if (segmentsToUpgrade.containsKey(currentSegmentName)) { - boolean segmentError = false; + boolean segmentError = true; LocalDateTime segmentRxStartTime = LocalDateTime.now(); LocalDateTime segmentRxStopTime = LocalDateTime.MAX; @@ -212,15 +213,17 @@ public SolrJerseyResponse upgradeCoreIndex( i, segmentReader.getSegmentName(), formatter.format(segmentRxStopTime)); - // segmentError = true - if (segmentError) { - coreRxStatus = CoreReindexingStatus.ERROR; - log.error( - "processSegment returned : {} for segment : {}", - segmentError, - segmentReader.getSegmentName()); + if (!segmentError) { + break; } } + if (segmentError) { + coreRxStatus = CoreReindexingStatus.ERROR; + log.error( + "processSegment returned : {} for segment : {}", + segmentError, + segmentReader.getSegmentName()); + } log.info( "Segment: {} Elapsed time: {}, start time: {}, stop time: {}", @@ -346,7 +349,20 @@ private int getIndexCreatedVersionMajor(SolrCore core) { } @SuppressWarnings({"rawtypes"}) - private UpdateRequestProcessorChain getUpdateProcessorChain(SolrCore core) { + private UpdateRequestProcessorChain getUpdateProcessorChain( + SolrCore core, String requestedUpdateChain) { + + if (requestedUpdateChain != null) { + UpdateRequestProcessorChain requestedChain = + core.getUpdateProcessingChain(requestedUpdateChain); + if (requestedChain != null) { + return requestedChain; + } + log.warn( + "Requested update chain {} not found for core {}, falling back to default", + requestedUpdateChain, + core.getName()); + } SolrRequestHandler reqHandler = core.getRequestHandler("/update"); NamedList initArgs = ((RequestHandlerBase) reqHandler).getInitArgs(); @@ -517,7 +533,7 @@ private SolrInputDocument getDummyDoc(SolrCore core) { return dummyDoc; } - private static void setLastDoc(SolrInputDocument solrDoc) { + private void setLastDoc(SolrInputDocument solrDoc) { lastDoc = solrDoc; } @@ -555,7 +571,7 @@ private boolean processSegment( SolrCore core, DocValuesIteratorCache dvICache) { - boolean segmentError = true; + boolean segmentError = false; int numDocsProcessed = 0; int numDocsAccum = 0; String coreName = core.getName(); @@ -563,12 +579,13 @@ private boolean processSegment( Bits bits = segmentReader.getLiveDocs(); SolrInputDocument solrDoc = null; UpdateRequestProcessor processor = null; + LocalSolrQueryRequest solrRequest = null; RefCounted searcherRef = core.getSearcher(); SolrDocumentFetcher docFetcher = searcherRef.get().getDocFetcher(); try { + // Exclude copy field targets to avoid duplicating values on reindex Set fields = docFetcher.getNonStoredDVsWithoutCopyTargets(); - LocalSolrQueryRequest solrRequest = - new LocalSolrQueryRequest(core, new ModifiableSolrParams()); + solrRequest = new LocalSolrQueryRequest(core, new ModifiableSolrParams()); SolrQueryResponse rsp = new SolrQueryResponse(); processor = processorChain.createProcessor(solrRequest, rsp); @@ -589,15 +606,11 @@ private boolean processSegment( processor.processAdd(currDocCmd); numDocsProcessed++; numDocsAccum++; - if (fetchNumDocs) { - numDocsAccum = 0; - } } - } catch (IOException e) { + } catch (Exception e) { log.error("Error in CvReindexingTask process() : {}", e.toString()); segmentError = true; } finally { - searcherRef.decref(); if (processor != null) { try { processor.finish(); @@ -613,12 +626,15 @@ private boolean processSegment( } } } + if (solrRequest != null) { + solrRequest.close(); + } + searcherRef.decref(); } - if (solrDoc != null) { + if (!segmentError && solrDoc != null) { setLastDoc(new SolrInputDocument(solrDoc)); getLastDoc().removeField("_version_"); } - segmentError = false; log.info( "End processing segment : {}, core: {} docs processed: {}", segmentReader.getSegmentName(), From daaaecb0d0e2d3e2e81a447571d8234cbec81169 Mon Sep 17 00:00:00 2001 From: Rahul Goswami Date: Fri, 12 Dec 2025 01:56:57 -0500 Subject: [PATCH 06/24] cleanup and optimizations --- .../handler/admin/api/UpgradeCoreIndex.java | 164 +++++------------- 1 file changed, 40 insertions(+), 124 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java b/solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java index 634257dab37..9d58f05521a 100644 --- a/solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java +++ b/solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java @@ -3,7 +3,6 @@ import java.io.IOException; import java.lang.invoke.MethodHandles; import java.nio.file.Paths; -import java.time.Duration; import java.time.LocalDateTime; import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; @@ -14,7 +13,6 @@ import java.util.Map; import java.util.Set; import java.util.UUID; -import org.apache.commons.lang3.time.DurationFormatUtils; import org.apache.lucene.document.Document; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.FilterLeafReader; @@ -113,8 +111,6 @@ public enum ReindexingThreadState { private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("dd-MM-yyyy HH:mm:ss"); - private SolrInputDocument lastDoc; - public UpgradeCoreIndex( CoreContainer coreContainer, CoreAdminHandler.CoreAdminAsyncTracker coreAdminAsyncTracker, @@ -141,99 +137,42 @@ public SolrJerseyResponse upgradeCoreIndex( () -> { try (SolrCore core = coreContainer.getCore(coreName)) { - log.warn("Processing core: {}", core.getName()); + log.info("Received UPGRADECOREINDEX request for core: {}", core.getName()); CoreReindexingStatus coreRxStatus = CoreReindexingStatus.REINDEXING_ACTIVE; - String indexDir = core.getIndexDir(); - - log.info("Starting to process core: {}", coreName); - RefCounted ssearcherRef = core.getSearcher(); List leafContexts = ssearcherRef.get().getTopReaderContext().leaves(); DocValuesIteratorCache dvICache = new DocValuesIteratorCache(ssearcherRef.get()); - Map segmentsToUpgrade = getSegmentsToUpgrade(indexDir); - - log.info("Segments to upgrade: {}", segmentsToUpgrade.toString()); - - setLastDoc(null); - UpdateRequestProcessorChain updateProcessorChain = getUpdateProcessorChain(core, requestBody.updateChain); try { - for (int segmentIndex = 0, c = leafContexts.size(); - segmentIndex < c; - segmentIndex++) { - LeafReaderContext lrc = leafContexts.get(segmentIndex); - LeafReader leafReader = lrc.reader(); - leafReader = FilterLeafReader.unwrap(leafReader); - log.debug( - "LeafReader hashcode: {}, getCreatedVersionMajor: {}, getMinVersion:{} ", - leafReader.hashCode(), - leafReader.getMetaData().createdVersionMajor(), - leafReader.getMetaData().minVersion()); - - SegmentReader segmentReader = (SegmentReader) leafReader; - String currentSegmentName = segmentReader.getSegmentName(); - - if (segmentsToUpgrade.containsKey(currentSegmentName)) { - boolean segmentError = true; - LocalDateTime segmentRxStartTime = LocalDateTime.now(); - LocalDateTime segmentRxStopTime = LocalDateTime.MAX; - - for (int i = 0; i < SEGMENT_ERROR_RETRIES; i++) { - // retrying segment; I anticipate throttling to be the main reason in most - // cases - // hence the sleep - if (i > 0) { - Thread.sleep(5 * 60 * 1000); // 5 minutes - } + for (LeafReaderContext lrc : leafContexts) { + if (!shouldUpgradeSegment(lrc)) { + continue; + } - log.info( - "Start processSegment run: {}, segment: {} at {}", - i, - segmentReader.getSegmentName(), - formatter.format(segmentRxStartTime)); - - segmentError = - processSegment( - segmentReader, - leafContexts, - segmentIndex, - updateProcessorChain, - core, - dvICache); - - segmentRxStopTime = LocalDateTime.now(); - log.info( - "End processSegment run: {}, segment: {} at {}", - i, - segmentReader.getSegmentName(), - formatter.format(segmentRxStopTime)); - if (!segmentError) { - break; - } - } - if (segmentError) { - coreRxStatus = CoreReindexingStatus.ERROR; - log.error( - "processSegment returned : {} for segment : {}", - segmentError, - segmentReader.getSegmentName()); + boolean segmentError = true; + + for (int i = 0; i < SEGMENT_ERROR_RETRIES; i++) { + // retrying segment; I anticipate throttling to be the main reason in most + // cases + // hence the sleep + if (i > 0) { + Thread.sleep(5 * 60 * 1000); // 5 minutes } - log.info( - "Segment: {} Elapsed time: {}, start time: {}, stop time: {}", - segmentReader.getSegmentName(), - DurationFormatUtils.formatDuration( - Duration.between(segmentRxStartTime, segmentRxStopTime).toMillis(), - "**H:mm:ss**", - true), - formatter.format(segmentRxStartTime), - formatter.format(segmentRxStopTime)); + segmentError = processSegment(lrc, updateProcessorChain, core, dvICache); + + if (!segmentError) { + break; + } + } + if (segmentError) { + coreRxStatus = CoreReindexingStatus.ERROR; } } } catch (Exception e) { @@ -333,6 +272,17 @@ public SolrJerseyResponse upgradeCoreIndex( }); } + private boolean shouldUpgradeSegment(LeafReaderContext lrc) { + Version segmentMinVersion = null; + try (LeafReader leafReader = lrc.reader()) { + segmentMinVersion = leafReader.getMetaData().minVersion(); + } catch (IOException ex) { + // TO-DO + // Wrap exception in CoreAdminAPIBaseException + } + return (segmentMinVersion == null || segmentMinVersion.major < Version.LATEST.major); + } + private int getIndexCreatedVersionMajor(SolrCore core) { int indexCreatedVersionMajor = 0; try (FSDirectory dir = FSDirectory.open(Paths.get(core.getIndexDir()))) { @@ -437,38 +387,15 @@ private Boolean validateSegmentsUpdated(SolrCore core) { private void doCommit(SolrCore core) { try { - SolrInputDocument dummyDoc = null; - SolrInputDocument lastDoc = getLastDoc(); - if (lastDoc == null) { - // set dummy doc for commit to take effect especially in case of 0-doc cores - dummyDoc = getDummyDoc(core); - lastDoc = dummyDoc; - } - UpdateRequest updateReq = new UpdateRequest(); - updateReq.add(lastDoc); - if (log.isDebugEnabled()) { - log.debug("Last solr Doc keySet: {}", lastDoc.keySet().toString()); - } + ModifiableSolrParams msp = new ModifiableSolrParams(); msp.add("commit", "true"); LocalSolrQueryRequest solrReq; solrReq = getLocalUpdateReq(updateReq, core, msp); - updateReq.getDocumentsMap().clear(); - if (log.isDebugEnabled()) { - log.debug( - "Calling commit. Solr params: {}, CvReindexingTask.getLastDoc(): {}", - msp.toString(), - lastDoc.toString()); - } doLocalUpdateReq(solrReq, core); - if (dummyDoc != null) { - deleteDummyDocAndCommit( - core, - (String) dummyDoc.getFieldValue(core.getLatestSchema().getUniqueKeyField().getName())); - } } catch (Exception e) { log.error( "Error while sending update request to advance index created version {}", e.toString()); @@ -533,10 +460,6 @@ private SolrInputDocument getDummyDoc(SolrCore core) { return dummyDoc; } - private void setLastDoc(SolrInputDocument solrDoc) { - lastDoc = solrDoc; - } - private static Map getSegmentsToUpgrade(String indexDir) { Map segmentsToUpgrade = new LinkedHashMap<>(); try (Directory dir = FSDirectory.open(Paths.get(indexDir)); @@ -564,18 +487,19 @@ private static Map getSegmentsToUpgrade(String indexDir) { } private boolean processSegment( - SegmentReader segmentReader, - List leafContexts, - int segmentIndex, + LeafReaderContext leafReaderContext, UpdateRequestProcessorChain processorChain, SolrCore core, DocValuesIteratorCache dvICache) { boolean segmentError = false; int numDocsProcessed = 0; - int numDocsAccum = 0; + String coreName = core.getName(); IndexSchema indexSchema = core.getLatestSchema(); + + LeafReader leafReader = FilterLeafReader.unwrap(leafReaderContext.reader()); + SegmentReader segmentReader = (SegmentReader) leafReader; Bits bits = segmentReader.getLiveDocs(); SolrInputDocument solrDoc = null; UpdateRequestProcessor processor = null; @@ -599,13 +523,12 @@ private boolean processSegment( solrDoc = toSolrInputDocument(doc, indexSchema); docFetcher.decorateDocValueFields( - solrDoc, leafContexts.get(segmentIndex).docBase + luceneDocId, fields, dvICache); + solrDoc, leafReaderContext.docBase + luceneDocId, fields, dvICache); solrDoc.removeField("_version_"); AddUpdateCommand currDocCmd = new AddUpdateCommand(solrRequest); currDocCmd.solrDoc = solrDoc; processor.processAdd(currDocCmd); numDocsProcessed++; - numDocsAccum++; } } catch (Exception e) { log.error("Error in CvReindexingTask process() : {}", e.toString()); @@ -631,10 +554,7 @@ private boolean processSegment( } searcherRef.decref(); } - if (!segmentError && solrDoc != null) { - setLastDoc(new SolrInputDocument(solrDoc)); - getLastDoc().removeField("_version_"); - } + log.info( "End processing segment : {}, core: {} docs processed: {}", segmentReader.getSegmentName(), @@ -644,10 +564,6 @@ private boolean processSegment( return segmentError; } - public SolrInputDocument getLastDoc() { - return lastDoc; - } - /* * Convert a lucene Document to a SolrInputDocument */ From 1fa4f2a05a7dee1044787fd499e87ed0727169cc Mon Sep 17 00:00:00 2001 From: Rahul Goswami Date: Fri, 12 Dec 2025 02:22:43 -0500 Subject: [PATCH 07/24] more cleanup --- .../handler/admin/api/UpgradeCoreIndex.java | 36 ------------------- 1 file changed, 36 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java b/solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java index 9d58f05521a..ac7fcf749ff 100644 --- a/solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java +++ b/solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java @@ -3,14 +3,10 @@ import java.io.IOException; import java.lang.invoke.MethodHandles; import java.nio.file.Paths; -import java.time.LocalDateTime; -import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; import java.util.Collection; import java.util.Date; -import java.util.LinkedHashMap; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.UUID; import org.apache.lucene.document.Document; @@ -25,7 +21,6 @@ import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.SegmentReader; import org.apache.lucene.index.StoredFields; -import org.apache.lucene.store.Directory; import org.apache.lucene.store.FSDirectory; import org.apache.lucene.util.Bits; import org.apache.lucene.util.Version; @@ -98,15 +93,10 @@ public enum ReindexingThreadState { WAITING; } - private static final long DEFAULT_EXECUTION_INTERVAL_MS = 60000; - - // private static final String reindexingStatusFileName = - // String.format("reindexing_status_%dx.csv", Version.LATEST.major); private static final int SEGMENT_ERROR_RETRIES = 3; private static final long SLEEP_TIME_BEFORE_AFTER_COMMIT_MS = 10000; private static final int RETRY_COUNT_FOR_SEGMENT_DELETION = 5; private static final long SLEEP_TIME_SEGMENT_DELETION_MS = 60000; - private static final LocalDateTime defaultDtm = LocalDateTime.ofEpochSecond(0, 0, ZoneOffset.UTC); private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("dd-MM-yyyy HH:mm:ss"); @@ -460,32 +450,6 @@ private SolrInputDocument getDummyDoc(SolrCore core) { return dummyDoc; } - private static Map getSegmentsToUpgrade(String indexDir) { - Map segmentsToUpgrade = new LinkedHashMap<>(); - try (Directory dir = FSDirectory.open(Paths.get(indexDir)); - IndexReader reader = DirectoryReader.open(dir)) { - for (LeafReaderContext lrc : reader.leaves()) { - LeafReader leafReader = lrc.reader(); - leafReader = FilterLeafReader.unwrap(leafReader); - - SegmentReader segmentReader = (SegmentReader) leafReader; - Version segmentMinVersion = segmentReader.getSegmentInfo().info.getMinVersion(); - if (segmentMinVersion == null || segmentMinVersion.major < Version.LATEST.major) { - segmentsToUpgrade.put( - segmentReader.getSegmentName(), segmentReader.getSegmentInfo().sizeInBytes()); - } else { - log.debug( - "Segment: {} shall be skipped since minVersion already at {}", - segmentReader.getSegmentName(), - segmentReader.getSegmentInfo().info.getMinVersion()); - } - } - } catch (Exception e) { - log.error("Exception while gettting segments to be uploaded from indexDir: {}", e.toString()); - } - return segmentsToUpgrade; - } - private boolean processSegment( LeafReaderContext leafReaderContext, UpdateRequestProcessorChain processorChain, From de455df88feef24e43797c638529258f76ad05eb Mon Sep 17 00:00:00 2001 From: Rahul Goswami Date: Sat, 13 Dec 2025 12:13:24 -0500 Subject: [PATCH 08/24] get default update chain if update.chain is not defined for /update --- .../solr/handler/admin/api/UpgradeCoreIndex.java | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java b/solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java index ac7fcf749ff..cc6bc3a56e5 100644 --- a/solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java +++ b/solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java @@ -129,7 +129,6 @@ public SolrJerseyResponse upgradeCoreIndex( log.info("Received UPGRADECOREINDEX request for core: {}", core.getName()); CoreReindexingStatus coreRxStatus = CoreReindexingStatus.REINDEXING_ACTIVE; - String indexDir = core.getIndexDir(); RefCounted ssearcherRef = core.getSearcher(); List leafContexts = ssearcherRef.get().getTopReaderContext().leaves(); @@ -292,11 +291,11 @@ private int getIndexCreatedVersionMajor(SolrCore core) { private UpdateRequestProcessorChain getUpdateProcessorChain( SolrCore core, String requestedUpdateChain) { + UpdateRequestProcessorChain resolvedChain = null; if (requestedUpdateChain != null) { - UpdateRequestProcessorChain requestedChain = - core.getUpdateProcessingChain(requestedUpdateChain); - if (requestedChain != null) { - return requestedChain; + resolvedChain = core.getUpdateProcessingChain(requestedUpdateChain); + if (resolvedChain != null) { + return resolvedChain; } log.warn( "Requested update chain {} not found for core {}, falling back to default", @@ -319,7 +318,11 @@ private UpdateRequestProcessorChain getUpdateProcessorChain( } } - return core.getUpdateProcessingChain(updateChainName); + resolvedChain = core.getUpdateProcessingChain(updateChainName); + if (resolvedChain == null) { + resolvedChain = core.getUpdateProcessingChain(null); + } + return resolvedChain; } /* From c98418b28c12b9c39e36d9cec84c3f1d989300e5 Mon Sep 17 00:00:00 2001 From: Rahul Goswami Date: Sun, 14 Dec 2025 01:09:13 -0500 Subject: [PATCH 09/24] fix getUpdateProcessorChain() --- .../handler/admin/api/UpgradeCoreIndex.java | 87 ++++++++----------- 1 file changed, 36 insertions(+), 51 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java b/solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java index cc6bc3a56e5..68b5619d502 100644 --- a/solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java +++ b/solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java @@ -291,9 +291,10 @@ private int getIndexCreatedVersionMajor(SolrCore core) { private UpdateRequestProcessorChain getUpdateProcessorChain( SolrCore core, String requestedUpdateChain) { - UpdateRequestProcessorChain resolvedChain = null; + // 1. Try explicitly requested chain first if (requestedUpdateChain != null) { - resolvedChain = core.getUpdateProcessingChain(requestedUpdateChain); + UpdateRequestProcessorChain resolvedChain = + core.getUpdateProcessingChain(requestedUpdateChain); if (resolvedChain != null) { return resolvedChain; } @@ -303,25 +304,46 @@ private UpdateRequestProcessorChain getUpdateProcessorChain( core.getName()); } + // 2. Try to find chain configured in /update handler + String updateChainName = null; SolrRequestHandler reqHandler = core.getRequestHandler("/update"); - NamedList initArgs = ((RequestHandlerBase) reqHandler).getInitArgs(); - String updateChainName = null; - Object defaults = initArgs.get("defaults"); - if (defaults != null && defaults instanceof NamedList) { - updateChainName = (String) (((NamedList) defaults).get(UpdateParams.UPDATE_CHAIN)); - } - if (updateChainName == null) { - Object invariants = initArgs.get("invariants"); - if (invariants != null && invariants instanceof NamedList) { - updateChainName = (String) (((NamedList) invariants).get(UpdateParams.UPDATE_CHAIN)); + if (reqHandler instanceof RequestHandlerBase) { + NamedList initArgs = ((RequestHandlerBase) reqHandler).getInitArgs(); + + if (initArgs != null) { + // Check defaults first + Object defaults = initArgs.get("defaults"); + if (defaults instanceof NamedList) { + updateChainName = (String) ((NamedList) defaults).get(UpdateParams.UPDATE_CHAIN); + } + + // Check invariants if not found in defaults + if (updateChainName == null) { + Object invariants = initArgs.get("invariants"); + if (invariants instanceof NamedList) { + updateChainName = (String) ((NamedList) invariants).get(UpdateParams.UPDATE_CHAIN); + } + } } + } else { + log.warn( + "Expected /update handler to be RequestHandlerBase, but got {}", + reqHandler == null ? "null" : reqHandler.getClass().getName()); } - resolvedChain = core.getUpdateProcessingChain(updateChainName); - if (resolvedChain == null) { + // 3. Try to get the chain by name (or default if name is null) + UpdateRequestProcessorChain resolvedChain = core.getUpdateProcessingChain(updateChainName); + + if (resolvedChain == null && updateChainName != null) { + // Chain name was configured but doesn't exist, fall back to default + log.warn( + "Update chain {} configured in /update handler not found for core {}, using default", + updateChainName, + core.getName()); resolvedChain = core.getUpdateProcessingChain(null); } + return resolvedChain; } @@ -395,23 +417,6 @@ private void doCommit(SolrCore core) { } } - private void deleteDummyDocAndCommit(SolrCore core, String dummyContentId) throws Exception { - UpdateRequest updateReq = new UpdateRequest(); - updateReq.deleteById(dummyContentId); - log.debug("Deleting dummy doc with id: {}", dummyContentId); - ModifiableSolrParams msp = new ModifiableSolrParams(); - - msp.add("commit", "true"); - LocalSolrQueryRequest solrReq; - try { - solrReq = getLocalUpdateReq(updateReq, core, msp); - doLocalUpdateReq(solrReq, core); - } catch (Exception e) { - log.error("Error deleting dummy doc"); - throw e; - } - } - public LocalSolrQueryRequest getLocalUpdateReq( UpdateRequest updateReq, SolrCore core, ModifiableSolrParams msp) throws IOException { LocalSolrQueryRequest solrReq = new LocalSolrQueryRequest(core, msp); @@ -433,26 +438,6 @@ public static void doLocalUpdateReq(LocalSolrQueryRequest solrReq, SolrCore core } } - private SolrInputDocument getDummyDoc(SolrCore core) { - SolrInputDocument dummyDoc = new SolrInputDocument(); - String dummyContentId = "cvrx-dummydoc" + UUID.randomUUID().toString(); - String uniqeKeyFieldName = core.getLatestSchema().getUniqueKeyField().getName(); - dummyDoc.addField(uniqeKeyFieldName, dummyContentId); - Collection requiredFields = core.getLatestSchema().getRequiredFields(); - - for (SchemaField sf : requiredFields) { - if (sf.getName().equals(uniqeKeyFieldName) || sf.getDefaultValue() != null) { - continue; - } - if (sf.getType() instanceof DateValueFieldType) { - dummyDoc.addField(sf.getName(), new Date()); - } else { - dummyDoc.addField(sf.getName(), "1"); - } - } - return dummyDoc; - } - private boolean processSegment( LeafReaderContext leafReaderContext, UpdateRequestProcessorChain processorChain, From 57b58421fe39d88cf86ecf80f408b07cde0c7567 Mon Sep 17 00:00:00 2001 From: Rahul Goswami Date: Sun, 14 Dec 2025 01:21:53 -0500 Subject: [PATCH 10/24] gradlew tidy --- .../org/apache/solr/handler/admin/api/UpgradeCoreIndex.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java b/solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java index 68b5619d502..add6ac9af7f 100644 --- a/solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java +++ b/solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java @@ -4,11 +4,8 @@ import java.lang.invoke.MethodHandles; import java.nio.file.Paths; import java.time.format.DateTimeFormatter; -import java.util.Collection; -import java.util.Date; import java.util.List; import java.util.Set; -import java.util.UUID; import org.apache.lucene.document.Document; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.FilterLeafReader; @@ -39,7 +36,6 @@ import org.apache.solr.request.SolrQueryRequest; import org.apache.solr.request.SolrRequestHandler; import org.apache.solr.response.SolrQueryResponse; -import org.apache.solr.schema.DateValueFieldType; import org.apache.solr.schema.IndexSchema; import org.apache.solr.schema.SchemaField; import org.apache.solr.search.DocValuesIteratorCache; From abd584e3881b3b49d0a19d93f149579a28e5df29 Mon Sep 17 00:00:00 2001 From: Rahul Goswami Date: Sun, 14 Dec 2025 02:28:22 -0500 Subject: [PATCH 11/24] fix getUpdateProcessorChain() --- .../handler/admin/api/UpgradeCoreIndex.java | 56 +++++++------------ 1 file changed, 21 insertions(+), 35 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java b/solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java index add6ac9af7f..2804aaf6ce0 100644 --- a/solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java +++ b/solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java @@ -287,60 +287,46 @@ private int getIndexCreatedVersionMajor(SolrCore core) { private UpdateRequestProcessorChain getUpdateProcessorChain( SolrCore core, String requestedUpdateChain) { - // 1. Try explicitly requested chain first + // Try explicitly requested chain first if (requestedUpdateChain != null) { UpdateRequestProcessorChain resolvedChain = core.getUpdateProcessingChain(requestedUpdateChain); if (resolvedChain != null) { return resolvedChain; + } else { + log.error( + "UPGRADECOREINDEX:: Requested update chain {} not found for core {}", + requestedUpdateChain, + core.getName()); + // TO-DO + // Throw exception wrapped in CoreAdminAPIBaseException } - log.warn( - "Requested update chain {} not found for core {}, falling back to default", - requestedUpdateChain, - core.getName()); } - // 2. Try to find chain configured in /update handler + // Try to find chain configured in /update handler String updateChainName = null; SolrRequestHandler reqHandler = core.getRequestHandler("/update"); - if (reqHandler instanceof RequestHandlerBase) { - NamedList initArgs = ((RequestHandlerBase) reqHandler).getInitArgs(); + NamedList initArgs = ((RequestHandlerBase) reqHandler).getInitArgs(); + + if (initArgs != null) { + // Check invariants first + Object invariants = initArgs.get("invariants"); + if (invariants instanceof NamedList) { + updateChainName = (String) ((NamedList) invariants).get(UpdateParams.UPDATE_CHAIN); + } - if (initArgs != null) { - // Check defaults first + // Check defaults if not found in invariants + if (updateChainName == null) { Object defaults = initArgs.get("defaults"); if (defaults instanceof NamedList) { updateChainName = (String) ((NamedList) defaults).get(UpdateParams.UPDATE_CHAIN); } - - // Check invariants if not found in defaults - if (updateChainName == null) { - Object invariants = initArgs.get("invariants"); - if (invariants instanceof NamedList) { - updateChainName = (String) ((NamedList) invariants).get(UpdateParams.UPDATE_CHAIN); - } - } } - } else { - log.warn( - "Expected /update handler to be RequestHandlerBase, but got {}", - reqHandler == null ? "null" : reqHandler.getClass().getName()); - } - - // 3. Try to get the chain by name (or default if name is null) - UpdateRequestProcessorChain resolvedChain = core.getUpdateProcessingChain(updateChainName); - - if (resolvedChain == null && updateChainName != null) { - // Chain name was configured but doesn't exist, fall back to default - log.warn( - "Update chain {} configured in /update handler not found for core {}, using default", - updateChainName, - core.getName()); - resolvedChain = core.getUpdateProcessingChain(null); } - return resolvedChain; + // default chain is returned if updateChainName is null + return core.getUpdateProcessingChain(updateChainName); } /* From bdfa291e3a62ca5076a507802ac9f79811b48c1a Mon Sep 17 00:00:00 2001 From: Rahul Goswami Date: Sun, 21 Dec 2025 02:09:23 -0500 Subject: [PATCH 12/24] 1) incorrect LeafReader closure.2) Fix validation --- .../handler/admin/api/UpgradeCoreIndex.java | 119 ++++++------------ 1 file changed, 38 insertions(+), 81 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java b/solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java index 2804aaf6ce0..310176d820a 100644 --- a/solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java +++ b/solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java @@ -140,24 +140,13 @@ public SolrJerseyResponse upgradeCoreIndex( continue; } - boolean segmentError = true; - - for (int i = 0; i < SEGMENT_ERROR_RETRIES; i++) { - // retrying segment; I anticipate throttling to be the main reason in most - // cases - // hence the sleep - if (i > 0) { - Thread.sleep(5 * 60 * 1000); // 5 minutes - } + boolean success = false; - segmentError = processSegment(lrc, updateProcessorChain, core, dvICache); + success = processSegment(lrc, updateProcessorChain, core, dvICache); - if (!segmentError) { - break; - } - } - if (segmentError) { + if (!success) { coreRxStatus = CoreReindexingStatus.ERROR; + break; } } } catch (Exception e) { @@ -165,6 +154,9 @@ public SolrJerseyResponse upgradeCoreIndex( coreRxStatus = CoreReindexingStatus.ERROR; } + // TO-DO + // Prepare SolrjerseyResponse if coreRxStatus==ERROR at this point + try { RefCounted iwRef = core.getSolrCoreState().getIndexWriter(null); if (iwRef != null) { @@ -189,69 +181,33 @@ public SolrJerseyResponse upgradeCoreIndex( // SolrCore.getSearcher() method ssearcherRef.decref(); - // IF coreRxStatus == CoreReindexingStatus.REINDEXING_PAUSED at this point then most - // likely it - // reached here - // by breaking out of segment processing. So we are going straight to setting the state - // and - // publishing to reindexing_status.csv - if (coreRxStatus != CoreReindexingStatus.REINDEXING_PAUSED) { + Boolean validationResult = false; + for (int i = 0; + (i < RETRY_COUNT_FOR_SEGMENT_DELETION) + && (validationResult != null && !validationResult); + i++) { try { - if (coreRxStatus == CoreReindexingStatus.ERROR) { - log.error("Core CoreReindexingStatus returned error, not calling commit"); - } else { - Boolean validationResult = false; - for (int i = 0; - (i < RETRY_COUNT_FOR_SEGMENT_DELETION) - && (validationResult != null && !validationResult); - i++) { - - doCommit(core); - Thread.sleep(SLEEP_TIME_BEFORE_AFTER_COMMIT_MS); - - validationResult = validateSegmentsUpdated(core); - log.warn( - "validateSegmentsUpdated() returned: {} for core: {}, sleeping for {}ms before calling commit...", - validationResult, - coreName, - SLEEP_TIME_SEGMENT_DELETION_MS); - Thread.sleep(SLEEP_TIME_SEGMENT_DELETION_MS); - } - if ((validationResult == null) - || (validationResult != null && !validationResult)) { - log.error( - "Validation failed for core: {}, not increasing indexCreatedVersionMajor", - validationResult, - coreName); - coreRxStatus = CoreReindexingStatus.ERROR; - } else { - - doCommit(core); - Thread.sleep(SLEEP_TIME_BEFORE_AFTER_COMMIT_MS); - - int indexCreatedVersionMajorAfterCommit = getIndexCreatedVersionMajor(core); - log.info( - "Post processing coreName: {}, indexCreatedVersionMajorAfterCommit: {}", - coreName, - indexCreatedVersionMajorAfterCommit); - if (indexCreatedVersionMajorAfterCommit == Version.LATEST.major) { - log.info( - "Core: {} index version updated successfully to {}", - coreName, - Version.LATEST); - coreRxStatus = CoreReindexingStatus.PROCESSED; - } else { - log.error( - "indexCreatedVersionMajorAfterCommit is {}", - indexCreatedVersionMajorAfterCommit); - coreRxStatus = CoreReindexingStatus.ERROR; - } - } - } - } catch (Exception e) { - log.error("Exception in processCore: {}", e.toString()); + doCommit(core); + Thread.sleep(SLEEP_TIME_BEFORE_AFTER_COMMIT_MS); + + validationResult = validateSegmentsUpdated(core); + log.warn( + "validateSegmentsUpdated() returned: {} for core: {}, sleeping for {}ms before calling commit...", + validationResult, + coreName, + SLEEP_TIME_SEGMENT_DELETION_MS); + Thread.sleep(SLEEP_TIME_SEGMENT_DELETION_MS); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); } } + + if ((validationResult == null) || (validationResult != null && !validationResult)) { + log.error( + "Validation failed for core: {}, Some older segments still present despite 100% deleted docs", + validationResult, coreName); + coreRxStatus = CoreReindexingStatus.ERROR; + } } return null; }); @@ -259,12 +215,13 @@ public SolrJerseyResponse upgradeCoreIndex( private boolean shouldUpgradeSegment(LeafReaderContext lrc) { Version segmentMinVersion = null; - try (LeafReader leafReader = lrc.reader()) { - segmentMinVersion = leafReader.getMetaData().minVersion(); - } catch (IOException ex) { - // TO-DO - // Wrap exception in CoreAdminAPIBaseException - } + + LeafReader leafReader = lrc.reader(); + leafReader = FilterLeafReader.unwrap(leafReader); + + SegmentCommitInfo si = ((SegmentReader) leafReader).getSegmentInfo(); + segmentMinVersion = si.info.getMinVersion(); + return (segmentMinVersion == null || segmentMinVersion.major < Version.LATEST.major); } From feb70ac7f0041ae53f8403bc1cb96a9c928401cf Mon Sep 17 00:00:00 2001 From: Rahul Goswami Date: Sun, 21 Dec 2025 12:42:16 -0500 Subject: [PATCH 13/24] cleanup --- .../handler/admin/api/UpgradeCoreIndex.java | 105 ++++++------------ 1 file changed, 34 insertions(+), 71 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java b/solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java index 310176d820a..f5ceec0e316 100644 --- a/solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java +++ b/solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java @@ -89,8 +89,6 @@ public enum ReindexingThreadState { WAITING; } - private static final int SEGMENT_ERROR_RETRIES = 3; - private static final long SLEEP_TIME_BEFORE_AFTER_COMMIT_MS = 10000; private static final int RETRY_COUNT_FOR_SEGMENT_DELETION = 5; private static final long SLEEP_TIME_SEGMENT_DELETION_MS = 60000; @@ -153,59 +151,39 @@ public SolrJerseyResponse upgradeCoreIndex( log.error("Error while processing core: {}, exception: {}", coreName, e.toString()); coreRxStatus = CoreReindexingStatus.ERROR; } + // important to decrement searcher ref count after use since we obtained it via the + // SolrCore.getSearcher() method + ssearcherRef.decref(); // TO-DO // Prepare SolrjerseyResponse if coreRxStatus==ERROR at this point + doCommit(core); try { - RefCounted iwRef = core.getSolrCoreState().getIndexWriter(null); - if (iwRef != null) { - IndexWriter iw = iwRef.get(); - try { - if (iw != null) { - iw.commit(); - } else { - log.warn("IndexWriter for core {} is null", core.getName()); - } - } finally { - iwRef.decref(); - } - } else { - log.warn("IWRef for core {} is null", core.getName()); - } - } catch (IOException ioEx) { - + Thread.sleep(10000); + } catch (InterruptedException ie) { + // we don't have to preserve the interrupt here } + /* + There is a delay observed sometimes between when a commit happens and + when the segment (with zero live docs) gets cleared. So adding a validation check with retries. + */ + Boolean indexUpgraded = validateSegmentsUpdated(core); - // important to decrement searcher ref count after use since we obtained it via the - // SolrCore.getSearcher() method - ssearcherRef.decref(); - - Boolean validationResult = false; - for (int i = 0; - (i < RETRY_COUNT_FOR_SEGMENT_DELETION) - && (validationResult != null && !validationResult); - i++) { + for (int i = 0; i < RETRY_COUNT_FOR_SEGMENT_DELETION && !indexUpgraded; i++) { try { doCommit(core); - Thread.sleep(SLEEP_TIME_BEFORE_AFTER_COMMIT_MS); - - validationResult = validateSegmentsUpdated(core); - log.warn( - "validateSegmentsUpdated() returned: {} for core: {}, sleeping for {}ms before calling commit...", - validationResult, - coreName, - SLEEP_TIME_SEGMENT_DELETION_MS); Thread.sleep(SLEEP_TIME_SEGMENT_DELETION_MS); + indexUpgraded = validateSegmentsUpdated(core); } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); + // we don't have to preserve the interrupt here } } - if ((validationResult == null) || (validationResult != null && !validationResult)) { + if (indexUpgraded == null || !indexUpgraded) { log.error( - "Validation failed for core: {}, Some older segments still present despite 100% deleted docs", - validationResult, coreName); + "Validation failed for core '{}'. Some older version segments still remain (likely despite 100% deleted docs).", + coreName); coreRxStatus = CoreReindexingStatus.ERROR; } } @@ -340,40 +318,25 @@ private Boolean validateSegmentsUpdated(SolrCore core) { } private void doCommit(SolrCore core) { + RefCounted iwRef = null; try { - UpdateRequest updateReq = new UpdateRequest(); - - ModifiableSolrParams msp = new ModifiableSolrParams(); - - msp.add("commit", "true"); - LocalSolrQueryRequest solrReq; - solrReq = getLocalUpdateReq(updateReq, core, msp); - doLocalUpdateReq(solrReq, core); - - } catch (Exception e) { - log.error( - "Error while sending update request to advance index created version {}", e.toString()); - } - } - - public LocalSolrQueryRequest getLocalUpdateReq( - UpdateRequest updateReq, SolrCore core, ModifiableSolrParams msp) throws IOException { - LocalSolrQueryRequest solrReq = new LocalSolrQueryRequest(core, msp); - solrReq.setContentStreams(updateReq.getContentStreams()); - return solrReq; - } - - public static void doLocalUpdateReq(LocalSolrQueryRequest solrReq, SolrCore core) { - try { - SolrQueryResponse resp = new SolrQueryResponse(); - core.getRequestHandler("/update").handleRequest(solrReq, resp); - if (resp.getException() != null) { - log.error("doLocalUpdateReq error: {}", resp.getException().toString()); + iwRef = core.getSolrCoreState().getIndexWriter(null); + if (iwRef != null) { + IndexWriter iw = iwRef.get(); + + if (iw != null) { + iw.commit(); + } else { + log.warn("IndexWriter for core {} is null", core.getName()); + } + } else { + log.warn("IWRef for core {} is null", core.getName()); } - } catch (Exception e) { - log.error("Exception in doLocalUpdateReq: {}", e.toString()); + } catch (IOException ioEx) { + log.warn( + String.format("Error commiting on core {} during index upgrade", core.getName()), ioEx); } finally { - solrReq.close(); + iwRef.decref(); } } From 393c081010ef2c77b7a01678fb888f5327f76b18 Mon Sep 17 00:00:00 2001 From: Rahul Goswami Date: Mon, 22 Dec 2025 01:34:01 -0500 Subject: [PATCH 14/24] interrupt handling and validation cleanup --- .../handler/admin/api/UpgradeCoreIndex.java | 71 +++++++------------ 1 file changed, 25 insertions(+), 46 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java b/solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java index f5ceec0e316..3d40c23067e 100644 --- a/solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java +++ b/solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java @@ -15,7 +15,6 @@ import org.apache.lucene.index.LeafReader; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.SegmentCommitInfo; -import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.SegmentReader; import org.apache.lucene.index.StoredFields; import org.apache.lucene.store.FSDirectory; @@ -23,7 +22,6 @@ import org.apache.lucene.util.Version; import org.apache.solr.client.api.model.SolrJerseyResponse; import org.apache.solr.client.api.model.UpgradeCoreIndexRequestBody; -import org.apache.solr.client.solrj.request.UpdateRequest; import org.apache.solr.common.SolrInputDocument; import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.params.UpdateParams; @@ -160,27 +158,36 @@ public SolrJerseyResponse upgradeCoreIndex( doCommit(core); try { + // giving some time for 0 doc segments to clear up Thread.sleep(10000); } catch (InterruptedException ie) { - // we don't have to preserve the interrupt here + /* We don't need to preserve the interrupt here. + Otherwise, we may get immediately interrupted when we try to call sleep() during validation in the next steps. + And we are almost done anyway. + */ } + + boolean indexUpgraded = isIndexUpgraded(core); + /* There is a delay observed sometimes between when a commit happens and when the segment (with zero live docs) gets cleared. So adding a validation check with retries. */ - Boolean indexUpgraded = validateSegmentsUpdated(core); - for (int i = 0; i < RETRY_COUNT_FOR_SEGMENT_DELETION && !indexUpgraded; i++) { try { doCommit(core); - Thread.sleep(SLEEP_TIME_SEGMENT_DELETION_MS); - indexUpgraded = validateSegmentsUpdated(core); + Thread.sleep(10000); + indexUpgraded = isIndexUpgraded(core); + } catch (InterruptedException ie) { - // we don't have to preserve the interrupt here + Thread.currentThread().interrupt(); + } + if (Thread.currentThread().isInterrupted()) { + break; } } - if (indexUpgraded == null || !indexUpgraded) { + if (!indexUpgraded) { log.error( "Validation failed for core '{}'. Some older version segments still remain (likely despite 100% deleted docs).", coreName); @@ -203,21 +210,6 @@ private boolean shouldUpgradeSegment(LeafReaderContext lrc) { return (segmentMinVersion == null || segmentMinVersion.major < Version.LATEST.major); } - private int getIndexCreatedVersionMajor(SolrCore core) { - int indexCreatedVersionMajor = 0; - try (FSDirectory dir = FSDirectory.open(Paths.get(core.getIndexDir()))) { - SegmentInfos sis = SegmentInfos.readLatestCommit(dir); - indexCreatedVersionMajor = sis.getIndexCreatedVersionMajor(); - } catch (Exception e) { - log.error( - "Error while opening segmentInfos for core: {}, exception: {}", - core.getName(), - e.toString()); - } - - return indexCreatedVersionMajor; - } - @SuppressWarnings({"rawtypes"}) private UpdateRequestProcessorChain getUpdateProcessorChain( SolrCore core, String requestedUpdateChain) { @@ -264,16 +256,8 @@ private UpdateRequestProcessorChain getUpdateProcessorChain( return core.getUpdateProcessingChain(updateChainName); } - /* - * returns: - * - * null: For any error or if there is at least one older version segment present in the index - * false: For any 0 older version segment present in the index having 0 numDocs - * true: If all segments are LATEST version - * - */ - private Boolean validateSegmentsUpdated(SolrCore core) { - Boolean segmentsUpdated = null; + private boolean isIndexUpgraded(SolrCore core) { + try (FSDirectory dir = FSDirectory.open(Paths.get(core.getIndexDir())); IndexReader reader = DirectoryReader.open(dir)) { @@ -282,7 +266,7 @@ private Boolean validateSegmentsUpdated(SolrCore core) { // no segments to process/validate return true; } - segmentsUpdated = true; + for (LeafReaderContext lrc : leaves) { LeafReader leafReader = lrc.reader(); leafReader = FilterLeafReader.unwrap(leafReader); @@ -292,29 +276,24 @@ private Boolean validateSegmentsUpdated(SolrCore core) { Version segMinVersion = si.info.getMinVersion(); if (segMinVersion == null || segMinVersion.major != Version.LATEST.major) { log.warn( - "validateSegmentsUpdated(): Core: {}, Segment {} is still at minVersion: {} and is not updated to the latest version {}", + "isIndexUpgraded(): Core: {}, Segment [{}] is still at minVersion [{}] and is not updated to the latest version [{}]; numLiveDocs: [{}]", core.getName(), si.info.name, (segMinVersion == null ? 6 : segMinVersion.major), - Version.LATEST.major); - segmentsUpdated = null; - // Since we could have 1 0-numDoc segment and multiple non-zero numDoc - // older version segments, we break only if a 0-numDoc segment is found - if (segmentReader.numDocs() == 0) { - segmentsUpdated = false; - break; - } + Version.LATEST.major, + segmentReader.numDocs()); + return false; } } } + return true; } catch (Exception e) { log.error( "Error while opening segmentInfos for core: {}, exception: {}", core.getName(), e.toString()); - segmentsUpdated = null; } - return segmentsUpdated; + return false; } private void doCommit(SolrCore core) { From b335e8ee4c39e815de8e40b4b7be74dcb8e33ee0 Mon Sep 17 00:00:00 2001 From: Rahul Goswami Date: Mon, 22 Dec 2025 23:20:36 -0500 Subject: [PATCH 15/24] async mode may cause merge policy to be reset immediately --- .../handler/admin/UpgradeCoreIndexOp.java | 43 +---- .../handler/admin/api/UpgradeCoreIndex.java | 171 ++++++++++-------- 2 files changed, 107 insertions(+), 107 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/handler/admin/UpgradeCoreIndexOp.java b/solr/core/src/java/org/apache/solr/handler/admin/UpgradeCoreIndexOp.java index b29abdaa14c..e5a803baac5 100644 --- a/solr/core/src/java/org/apache/solr/handler/admin/UpgradeCoreIndexOp.java +++ b/solr/core/src/java/org/apache/solr/handler/admin/UpgradeCoreIndexOp.java @@ -1,19 +1,15 @@ package org.apache.solr.handler.admin; -import org.apache.lucene.index.IndexWriter; -import org.apache.lucene.index.MergePolicy; import org.apache.solr.client.api.model.UpgradeCoreIndexRequestBody; +import org.apache.solr.common.params.CommonAdminParams; import org.apache.solr.common.params.CoreAdminParams; import org.apache.solr.common.params.SolrParams; import org.apache.solr.common.params.UpdateParams; import org.apache.solr.core.CoreContainer; -import org.apache.solr.core.SolrCore; import org.apache.solr.handler.admin.api.UpgradeCoreIndex; import org.apache.solr.handler.api.V2ApiUtils; -import org.apache.solr.index.LatestVersionFilterMergePolicy; import org.apache.solr.request.SolrQueryRequest; import org.apache.solr.response.SolrQueryResponse; -import org.apache.solr.util.RefCounted; public class UpgradeCoreIndexOp implements CoreAdminHandler.CoreAdminOp { @FunctionalInterface @@ -38,37 +34,12 @@ public void execute(CoreAdminHandler.CallInfo it) throws Exception { String cname = params.required().get(CoreAdminParams.CORE); final var requestBody = new UpgradeCoreIndexRequestBody(); requestBody.updateChain = params.get(UpdateParams.UPDATE_CHAIN); - RefCounted iwRef = null; - IndexWriter iw = null; - MergePolicy originalMergePolicy = null; - SolrCore core = it.req.getCore(); - try { - iwRef = core.getSolrCoreState().getIndexWriter(core); + requestBody.async = params.get(CommonAdminParams.ASYNC); - // set LatestVersionFilterMergePolicy as the merge policy to prevent older segments from - // participating in merges while we reindex - if (iwRef != null) { - iw = iwRef.get(); - } - if (iw != null) { - originalMergePolicy = iw.getConfig().getMergePolicy(); - iw.getConfig() - .setMergePolicy(new LatestVersionFilterMergePolicy(iw.getConfig().getMergePolicy())); - } - UpgradeCoreIndex upgradeCoreIndexApi = - UPGRADE_CORE_INDEX_FACTORY.create( - it.handler.coreContainer, it.handler.coreAdminAsyncTracker, it.req, it.rsp); - final var response = upgradeCoreIndexApi.upgradeCoreIndex(cname, requestBody); - V2ApiUtils.squashIntoSolrResponseWithoutHeader(it.rsp, response); - } finally { - // reset original merge policy - if (iw != null && originalMergePolicy != null) { - iw.getConfig().setMergePolicy(originalMergePolicy); - } - - if (iwRef != null) { - iwRef.decref(); - } - } + UpgradeCoreIndex upgradeCoreIndexApi = + UPGRADE_CORE_INDEX_FACTORY.create( + it.handler.coreContainer, it.handler.coreAdminAsyncTracker, it.req, it.rsp); + final var response = upgradeCoreIndexApi.upgradeCoreIndex(cname, requestBody); + V2ApiUtils.squashIntoSolrResponseWithoutHeader(it.rsp, response); } } diff --git a/solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java b/solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java index 3d40c23067e..c9b0dc014c6 100644 --- a/solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java +++ b/solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java @@ -3,7 +3,6 @@ import java.io.IOException; import java.lang.invoke.MethodHandles; import java.nio.file.Paths; -import java.time.format.DateTimeFormatter; import java.util.List; import java.util.Set; import org.apache.lucene.document.Document; @@ -14,6 +13,7 @@ import org.apache.lucene.index.IndexableField; import org.apache.lucene.index.LeafReader; import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.MergePolicy; import org.apache.lucene.index.SegmentCommitInfo; import org.apache.lucene.index.SegmentReader; import org.apache.lucene.index.StoredFields; @@ -30,6 +30,7 @@ import org.apache.solr.core.SolrCore; import org.apache.solr.handler.RequestHandlerBase; import org.apache.solr.handler.admin.CoreAdminHandler; +import org.apache.solr.index.LatestVersionFilterMergePolicy; import org.apache.solr.request.LocalSolrQueryRequest; import org.apache.solr.request.SolrQueryRequest; import org.apache.solr.request.SolrRequestHandler; @@ -88,10 +89,6 @@ public enum ReindexingThreadState { } private static final int RETRY_COUNT_FOR_SEGMENT_DELETION = 5; - private static final long SLEEP_TIME_SEGMENT_DELETION_MS = 60000; - - private static final DateTimeFormatter formatter = - DateTimeFormatter.ofPattern("dd-MM-yyyy HH:mm:ss"); public UpgradeCoreIndex( CoreContainer coreContainer, @@ -121,77 +118,108 @@ public SolrJerseyResponse upgradeCoreIndex( log.info("Received UPGRADECOREINDEX request for core: {}", core.getName()); CoreReindexingStatus coreRxStatus = CoreReindexingStatus.REINDEXING_ACTIVE; - RefCounted ssearcherRef = core.getSearcher(); - List leafContexts = - ssearcherRef.get().getTopReaderContext().leaves(); - DocValuesIteratorCache dvICache = new DocValuesIteratorCache(ssearcherRef.get()); - - UpdateRequestProcessorChain updateProcessorChain = - getUpdateProcessorChain(core, requestBody.updateChain); + // Set LatestVersionFilterMergePolicy to prevent older segments from + // participating in merges while we reindex. + // This must be done inside the async lambda to ensure it stays in effect + // for the duration of the reindexing operation. + RefCounted iwRef = null; + MergePolicy originalMergePolicy = null; try { - - for (LeafReaderContext lrc : leafContexts) { - if (!shouldUpgradeSegment(lrc)) { - continue; + iwRef = core.getSolrCoreState().getIndexWriter(core); + if (iwRef != null) { + IndexWriter iw = iwRef.get(); + if (iw != null) { + originalMergePolicy = iw.getConfig().getMergePolicy(); + iw.getConfig() + .setMergePolicy( + new LatestVersionFilterMergePolicy(iw.getConfig().getMergePolicy())); } + } - boolean success = false; + RefCounted ssearcherRef = core.getSearcher(); + try { + List leafContexts = + ssearcherRef.get().getTopReaderContext().leaves(); + DocValuesIteratorCache dvICache = new DocValuesIteratorCache(ssearcherRef.get()); - success = processSegment(lrc, updateProcessorChain, core, dvICache); + UpdateRequestProcessorChain updateProcessorChain = + getUpdateProcessorChain(core, requestBody.updateChain); - if (!success) { - coreRxStatus = CoreReindexingStatus.ERROR; - break; - } - } - } catch (Exception e) { - log.error("Error while processing core: {}, exception: {}", coreName, e.toString()); - coreRxStatus = CoreReindexingStatus.ERROR; - } - // important to decrement searcher ref count after use since we obtained it via the - // SolrCore.getSearcher() method - ssearcherRef.decref(); + for (LeafReaderContext lrc : leafContexts) { + if (!shouldUpgradeSegment(lrc)) { + continue; + } - // TO-DO - // Prepare SolrjerseyResponse if coreRxStatus==ERROR at this point + boolean success = + processSegment(lrc, updateProcessorChain, core, ssearcherRef.get(), dvICache); - doCommit(core); - try { - // giving some time for 0 doc segments to clear up - Thread.sleep(10000); - } catch (InterruptedException ie) { - /* We don't need to preserve the interrupt here. - Otherwise, we may get immediately interrupted when we try to call sleep() during validation in the next steps. - And we are almost done anyway. - */ - } + if (!success) { + coreRxStatus = CoreReindexingStatus.ERROR; + break; + } + } + } catch (Exception e) { + log.error("Error while processing core: {}", coreName, e); + coreRxStatus = CoreReindexingStatus.ERROR; + } finally { + // important to decrement searcher ref count after use since we obtained it via the + // SolrCore.getSearcher() method + ssearcherRef.decref(); + } - boolean indexUpgraded = isIndexUpgraded(core); + // TO-DO + // Prepare SolrjerseyResponse if coreRxStatus==ERROR at this point - /* - There is a delay observed sometimes between when a commit happens and - when the segment (with zero live docs) gets cleared. So adding a validation check with retries. - */ - for (int i = 0; i < RETRY_COUNT_FOR_SEGMENT_DELETION && !indexUpgraded; i++) { + doCommit(core); try { - doCommit(core); + // giving some time for 0 doc segments to clear up Thread.sleep(10000); - indexUpgraded = isIndexUpgraded(core); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); + /* We don't need to preserve the interrupt here. + Otherwise, we may get immediately interrupted when we try to call sleep() during validation in the next steps. + And we are almost done anyway. + */ } - if (Thread.currentThread().isInterrupted()) { - break; + + boolean indexUpgraded = isIndexUpgraded(core); + + /* + There is a delay observed sometimes between when a commit happens and + when the segment (with zero live docs) gets cleared. So adding a validation check with retries. + */ + for (int i = 0; i < RETRY_COUNT_FOR_SEGMENT_DELETION && !indexUpgraded; i++) { + try { + doCommit(core); + Thread.sleep(10000); + indexUpgraded = isIndexUpgraded(core); + + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + if (Thread.currentThread().isInterrupted()) { + break; + } } - } - if (!indexUpgraded) { - log.error( - "Validation failed for core '{}'. Some older version segments still remain (likely despite 100% deleted docs).", - coreName); - coreRxStatus = CoreReindexingStatus.ERROR; + if (!indexUpgraded) { + log.error( + "Validation failed for core '{}'. Some older version segments still remain (likely despite 100% deleted docs).", + coreName); + coreRxStatus = CoreReindexingStatus.ERROR; + } + } catch (IOException ioEx) { + // TO-DO + // Throw exception wrapped in CoreAdminAPIBaseException + } finally { + // Restore original merge policy + if (iwRef != null) { + IndexWriter iw = iwRef.get(); + if (iw != null && originalMergePolicy != null) { + iw.getConfig().setMergePolicy(originalMergePolicy); + } + iwRef.decref(); + } } } return null; @@ -315,7 +343,9 @@ private void doCommit(SolrCore core) { log.warn( String.format("Error commiting on core {} during index upgrade", core.getName()), ioEx); } finally { - iwRef.decref(); + if (iwRef != null) { + iwRef.decref(); + } } } @@ -323,9 +353,10 @@ private boolean processSegment( LeafReaderContext leafReaderContext, UpdateRequestProcessorChain processorChain, SolrCore core, + SolrIndexSearcher solrIndexSearcher, DocValuesIteratorCache dvICache) { - boolean segmentError = false; + boolean success = false; int numDocsProcessed = 0; String coreName = core.getName(); @@ -337,11 +368,10 @@ private boolean processSegment( SolrInputDocument solrDoc = null; UpdateRequestProcessor processor = null; LocalSolrQueryRequest solrRequest = null; - RefCounted searcherRef = core.getSearcher(); - SolrDocumentFetcher docFetcher = searcherRef.get().getDocFetcher(); + SolrDocumentFetcher docFetcher = solrIndexSearcher.getDocFetcher(); try { // Exclude copy field targets to avoid duplicating values on reindex - Set fields = docFetcher.getNonStoredDVsWithoutCopyTargets(); + Set nonStoredDVFields = docFetcher.getNonStoredDVsWithoutCopyTargets(); solrRequest = new LocalSolrQueryRequest(core, new ModifiableSolrParams()); SolrQueryResponse rsp = new SolrQueryResponse(); @@ -356,36 +386,35 @@ private boolean processSegment( solrDoc = toSolrInputDocument(doc, indexSchema); docFetcher.decorateDocValueFields( - solrDoc, leafReaderContext.docBase + luceneDocId, fields, dvICache); + solrDoc, leafReaderContext.docBase + luceneDocId, nonStoredDVFields, dvICache); solrDoc.removeField("_version_"); AddUpdateCommand currDocCmd = new AddUpdateCommand(solrRequest); currDocCmd.solrDoc = solrDoc; processor.processAdd(currDocCmd); numDocsProcessed++; } + success = true; } catch (Exception e) { log.error("Error in CvReindexingTask process() : {}", e.toString()); - segmentError = true; + } finally { if (processor != null) { try { processor.finish(); } catch (Exception e) { log.error("Exception while doing finish processor.finish() : {}", e.toString()); - segmentError = true; + } finally { try { processor.close(); } catch (IOException e) { log.error("Exception while closing processor: {}", e.toString()); - segmentError = true; } } } if (solrRequest != null) { solrRequest.close(); } - searcherRef.decref(); } log.info( @@ -394,7 +423,7 @@ private boolean processSegment( coreName, numDocsProcessed); - return segmentError; + return success; } /* From 6a354fda657583bbecd2d85c8550479e621d036a Mon Sep 17 00:00:00 2001 From: Rahul Goswami Date: Tue, 23 Dec 2025 01:46:57 -0500 Subject: [PATCH 16/24] Fix null response --- .../org/apache/solr/handler/admin/api/UpgradeCoreIndex.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java b/solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java index c9b0dc014c6..640761462cb 100644 --- a/solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java +++ b/solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java @@ -222,7 +222,7 @@ when the segment (with zero live docs) gets cleared. So adding a validation chec } } } - return null; + return response; }); } From 088c5990945f13f420b59d853e81f7b569ac1ddf Mon Sep 17 00:00:00 2001 From: Rahul Goswami Date: Mon, 29 Dec 2025 01:31:53 -0500 Subject: [PATCH 17/24] changelog --- ...-17725-CoreAdmin-API-to-upgrade-an-index-in-place.yml | 9 +++++++++ 1 file changed, 9 insertions(+) create mode 100644 changelog/unreleased/SOLR-17725-CoreAdmin-API-to-upgrade-an-index-in-place.yml diff --git a/changelog/unreleased/SOLR-17725-CoreAdmin-API-to-upgrade-an-index-in-place.yml b/changelog/unreleased/SOLR-17725-CoreAdmin-API-to-upgrade-an-index-in-place.yml new file mode 100644 index 00000000000..0ac334f8b56 --- /dev/null +++ b/changelog/unreleased/SOLR-17725-CoreAdmin-API-to-upgrade-an-index-in-place.yml @@ -0,0 +1,9 @@ +# See https://github.com/apache/solr/blob/main/dev-docs/changelog.adoc +title: CoreAdmin API (/admin/cores?action=UPGRADECOREINDEX) to upgrade an index in-place +type: added +authors: + - name: Rahul Goswami +links: + - name: SOLR-17725 + url: https://issues.apache.org/jira/browse/SOLR-17725 + From 3655ef4dfb52e7669ec35f58996dd91be0d4972c Mon Sep 17 00:00:00 2001 From: Rahul Goswami Date: Tue, 30 Dec 2025 11:52:33 -0500 Subject: [PATCH 18/24] LatestVersionMergePolicy renamed --- .../org/apache/solr/handler/admin/api/UpgradeCoreIndex.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java b/solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java index 640761462cb..d526417fe1f 100644 --- a/solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java +++ b/solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java @@ -30,7 +30,7 @@ import org.apache.solr.core.SolrCore; import org.apache.solr.handler.RequestHandlerBase; import org.apache.solr.handler.admin.CoreAdminHandler; -import org.apache.solr.index.LatestVersionFilterMergePolicy; +import org.apache.solr.index.LatestVersionMergePolicy; import org.apache.solr.request.LocalSolrQueryRequest; import org.apache.solr.request.SolrQueryRequest; import org.apache.solr.request.SolrRequestHandler; @@ -119,7 +119,7 @@ public SolrJerseyResponse upgradeCoreIndex( log.info("Received UPGRADECOREINDEX request for core: {}", core.getName()); CoreReindexingStatus coreRxStatus = CoreReindexingStatus.REINDEXING_ACTIVE; - // Set LatestVersionFilterMergePolicy to prevent older segments from + // Set LatestVersionMergePolicy to prevent older segments from // participating in merges while we reindex. // This must be done inside the async lambda to ensure it stays in effect // for the duration of the reindexing operation. @@ -133,7 +133,7 @@ public SolrJerseyResponse upgradeCoreIndex( originalMergePolicy = iw.getConfig().getMergePolicy(); iw.getConfig() .setMergePolicy( - new LatestVersionFilterMergePolicy(iw.getConfig().getMergePolicy())); + new LatestVersionMergePolicy(iw.getConfig().getMergePolicy())); } } From a50fc8d9ae4f27c526b87d84faf49c610b9f2bce Mon Sep 17 00:00:00 2001 From: Rahul Goswami Date: Tue, 30 Dec 2025 15:33:37 -0500 Subject: [PATCH 19/24] Fix issue with running in async mode --- .../java/org/apache/solr/handler/admin/UpgradeCoreIndexOp.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/handler/admin/UpgradeCoreIndexOp.java b/solr/core/src/java/org/apache/solr/handler/admin/UpgradeCoreIndexOp.java index e5a803baac5..c706535b6d6 100644 --- a/solr/core/src/java/org/apache/solr/handler/admin/UpgradeCoreIndexOp.java +++ b/solr/core/src/java/org/apache/solr/handler/admin/UpgradeCoreIndexOp.java @@ -1,7 +1,6 @@ package org.apache.solr.handler.admin; import org.apache.solr.client.api.model.UpgradeCoreIndexRequestBody; -import org.apache.solr.common.params.CommonAdminParams; import org.apache.solr.common.params.CoreAdminParams; import org.apache.solr.common.params.SolrParams; import org.apache.solr.common.params.UpdateParams; @@ -34,7 +33,6 @@ public void execute(CoreAdminHandler.CallInfo it) throws Exception { String cname = params.required().get(CoreAdminParams.CORE); final var requestBody = new UpgradeCoreIndexRequestBody(); requestBody.updateChain = params.get(UpdateParams.UPDATE_CHAIN); - requestBody.async = params.get(CommonAdminParams.ASYNC); UpgradeCoreIndex upgradeCoreIndexApi = UPGRADE_CORE_INDEX_FACTORY.create( From 36071105f66468d186ced4c1b55eae8024f11e1d Mon Sep 17 00:00:00 2001 From: Rahul Goswami Date: Wed, 31 Dec 2025 01:53:21 -0500 Subject: [PATCH 20/24] Decorate response with bookkeeping info and upgrade state --- .../api/model/UpgradeCoreIndexResponse.java | 22 ++++ .../handler/admin/UpgradeCoreIndexOp.java | 16 ++- .../handler/admin/api/UpgradeCoreIndex.java | 111 +++++++----------- 3 files changed, 78 insertions(+), 71 deletions(-) create mode 100644 solr/api/src/java/org/apache/solr/client/api/model/UpgradeCoreIndexResponse.java diff --git a/solr/api/src/java/org/apache/solr/client/api/model/UpgradeCoreIndexResponse.java b/solr/api/src/java/org/apache/solr/client/api/model/UpgradeCoreIndexResponse.java new file mode 100644 index 00000000000..f96d6a54173 --- /dev/null +++ b/solr/api/src/java/org/apache/solr/client/api/model/UpgradeCoreIndexResponse.java @@ -0,0 +1,22 @@ +package org.apache.solr.client.api.model; + +import com.fasterxml.jackson.annotation.JsonProperty; +import io.swagger.v3.oas.annotations.media.Schema; + +public class UpgradeCoreIndexResponse extends SolrJerseyResponse { + @Schema(description = "The name of the core.") + @JsonProperty + public String core; + + @Schema(description = "The total number of segments eligible for upgrade.") + @JsonProperty + public Integer numSegmentsEligibleForUpgrade; + + @Schema(description = "The number of segments successfully upgraded.") + @JsonProperty + public Integer numSegmentsUpgraded; + + @Schema(description = "Status of the core index upgrade operation.") + @JsonProperty + public String upgradeStatus; +} diff --git a/solr/core/src/java/org/apache/solr/handler/admin/UpgradeCoreIndexOp.java b/solr/core/src/java/org/apache/solr/handler/admin/UpgradeCoreIndexOp.java index c706535b6d6..a5b9a9effad 100644 --- a/solr/core/src/java/org/apache/solr/handler/admin/UpgradeCoreIndexOp.java +++ b/solr/core/src/java/org/apache/solr/handler/admin/UpgradeCoreIndexOp.java @@ -1,9 +1,12 @@ package org.apache.solr.handler.admin; import org.apache.solr.client.api.model.UpgradeCoreIndexRequestBody; +import org.apache.solr.client.api.model.UpgradeCoreIndexResponse; +import org.apache.solr.common.params.CommonAdminParams; import org.apache.solr.common.params.CoreAdminParams; import org.apache.solr.common.params.SolrParams; import org.apache.solr.common.params.UpdateParams; +import org.apache.solr.common.util.NamedList; import org.apache.solr.core.CoreContainer; import org.apache.solr.handler.admin.api.UpgradeCoreIndex; import org.apache.solr.handler.api.V2ApiUtils; @@ -31,13 +34,24 @@ public boolean isExpensive() { public void execute(CoreAdminHandler.CallInfo it) throws Exception { SolrParams params = it.req.getParams(); String cname = params.required().get(CoreAdminParams.CORE); + final boolean isAsync = params.get(CommonAdminParams.ASYNC) != null; final var requestBody = new UpgradeCoreIndexRequestBody(); requestBody.updateChain = params.get(UpdateParams.UPDATE_CHAIN); UpgradeCoreIndex upgradeCoreIndexApi = UPGRADE_CORE_INDEX_FACTORY.create( it.handler.coreContainer, it.handler.coreAdminAsyncTracker, it.req, it.rsp); - final var response = upgradeCoreIndexApi.upgradeCoreIndex(cname, requestBody); + final UpgradeCoreIndexResponse response = + upgradeCoreIndexApi.upgradeCoreIndex(cname, requestBody); V2ApiUtils.squashIntoSolrResponseWithoutHeader(it.rsp, response); + + if (isAsync) { + final var opResponse = new NamedList<>(); + V2ApiUtils.squashIntoNamedListWithoutHeader(opResponse, response); + // REQUESTSTATUS is returning the inner response NamedList as a positional array + // ([k1,v1,k2,v2...]). + // so converting to a map + it.rsp.addResponse(opResponse.asMap(1)); + } } } diff --git a/solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java b/solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java index d526417fe1f..02bfffc3ae0 100644 --- a/solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java +++ b/solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java @@ -20,8 +20,8 @@ import org.apache.lucene.store.FSDirectory; import org.apache.lucene.util.Bits; import org.apache.lucene.util.Version; -import org.apache.solr.client.api.model.SolrJerseyResponse; import org.apache.solr.client.api.model.UpgradeCoreIndexRequestBody; +import org.apache.solr.client.api.model.UpgradeCoreIndexResponse; import org.apache.solr.common.SolrInputDocument; import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.params.UpdateParams; @@ -50,42 +50,10 @@ public class UpgradeCoreIndex extends CoreAdminAPIBase { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - /* - * The re-indexing status at any point of time for a particular core. - * DEFAULT - This is the default status, meaning it is yet to be processed and checked if the version is LATEST - * for this core - * REINDEXING_ACTIVE - This is set at the start of the re-indexing operation - * PROCESSED - This is set at the end of the re-indexing operation if there are no errors - * ERROR - This is set if there is any error in any segment. This core will be retried CORE_ERROR_RETRIES number - * of - * times - * CORRECTVERSION - This is set if the core is already at the correct version - */ - public enum CoreReindexingStatus { - DEFAULT, - REINDEXING_ACTIVE, - REINDEXING_PAUSED, - PROCESSED, + public enum CoreIndexUpgradeStatus { + UPGRADE_SUCCESSFUL, ERROR, - CORRECTVERSION; - } - - /* - * The state that a single ReindexingThread would be in. This is set to START_REINDEXING by CPUMonitorTask - * thread - * START_REINDEXING - CPUMonitorTask checks if current CPU usage is below given threshold and sets this state - * WAITING - CPUMonitorTask checks if the CPU usage is above given threshold and sets this state to put - * the CVReindexingTask thread in a waiting state. Note that ReindexingTask thread run() will still be checked - * in this case. This can also be set when all cores have processed and there are no pending cores. - * STOP_REINDEXING - CPUMonitorTask checks if all cores are processed with the status between {CORRECTVERSION, - * ERROR} - * and if all cores are processed then sets it to STOP_REINDEXING - * - */ - public enum ReindexingThreadState { - START_REINDEXING, - STOP_REINDEXING, - WAITING; + NO_UPGRADE_NEEDED; } private static final int RETRY_COUNT_FOR_SEGMENT_DELETION = 5; @@ -103,10 +71,12 @@ public boolean isExpensive() { return true; } - public SolrJerseyResponse upgradeCoreIndex( + public UpgradeCoreIndexResponse upgradeCoreIndex( String coreName, UpgradeCoreIndexRequestBody requestBody) throws Exception { ensureRequiredParameterProvided("coreName", coreName); - SolrJerseyResponse response = instantiateJerseyResponse(SolrJerseyResponse.class); + + final UpgradeCoreIndexResponse response = + instantiateJerseyResponse(UpgradeCoreIndexResponse.class); return handlePotentiallyAsynchronousTask( response, @@ -117,14 +87,15 @@ public SolrJerseyResponse upgradeCoreIndex( try (SolrCore core = coreContainer.getCore(coreName)) { log.info("Received UPGRADECOREINDEX request for core: {}", core.getName()); - CoreReindexingStatus coreRxStatus = CoreReindexingStatus.REINDEXING_ACTIVE; // Set LatestVersionMergePolicy to prevent older segments from - // participating in merges while we reindex. - // This must be done inside the async lambda to ensure it stays in effect - // for the duration of the reindexing operation. + // participating in merges while we reindex. This is to prevent any older version + // segments from + // merging with any newly formed segments created due to reindexing and undoing the work + // we are doing. RefCounted iwRef = null; MergePolicy originalMergePolicy = null; + int numSegmentsEligibleForUpgrade = 0, numSegmentsUpgraded = 0; try { iwRef = core.getSolrCoreState().getIndexWriter(core); if (iwRef != null) { @@ -146,6 +117,18 @@ public SolrJerseyResponse upgradeCoreIndex( UpdateRequestProcessorChain updateProcessorChain = getUpdateProcessorChain(core, requestBody.updateChain); + for (LeafReaderContext lrc : leafContexts) { + if (shouldUpgradeSegment(lrc)) { + numSegmentsEligibleForUpgrade++; + } + } + if (numSegmentsEligibleForUpgrade == 0) { + response.core = coreName; + response.upgradeStatus = CoreIndexUpgradeStatus.NO_UPGRADE_NEEDED.toString(); + response.numSegmentsEligibleForUpgrade = 0; + return response; + } + for (LeafReaderContext lrc : leafContexts) { if (!shouldUpgradeSegment(lrc)) { continue; @@ -154,14 +137,16 @@ public SolrJerseyResponse upgradeCoreIndex( boolean success = processSegment(lrc, updateProcessorChain, core, ssearcherRef.get(), dvICache); - if (!success) { - coreRxStatus = CoreReindexingStatus.ERROR; + if (success) { + numSegmentsUpgraded++; + } else { + response.upgradeStatus = CoreIndexUpgradeStatus.ERROR.toString(); break; } } } catch (Exception e) { log.error("Error while processing core: {}", coreName, e); - coreRxStatus = CoreReindexingStatus.ERROR; + response.upgradeStatus = CoreIndexUpgradeStatus.ERROR.toString(); } finally { // important to decrement searcher ref count after use since we obtained it via the // SolrCore.getSearcher() method @@ -176,38 +161,23 @@ public SolrJerseyResponse upgradeCoreIndex( // giving some time for 0 doc segments to clear up Thread.sleep(10000); } catch (InterruptedException ie) { - /* We don't need to preserve the interrupt here. - Otherwise, we may get immediately interrupted when we try to call sleep() during validation in the next steps. - And we are almost done anyway. - */ + Thread.currentThread().interrupt(); } boolean indexUpgraded = isIndexUpgraded(core); - /* - There is a delay observed sometimes between when a commit happens and - when the segment (with zero live docs) gets cleared. So adding a validation check with retries. - */ - for (int i = 0; i < RETRY_COUNT_FOR_SEGMENT_DELETION && !indexUpgraded; i++) { - try { - doCommit(core); - Thread.sleep(10000); - indexUpgraded = isIndexUpgraded(core); - - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - } - if (Thread.currentThread().isInterrupted()) { - break; - } - } - if (!indexUpgraded) { log.error( - "Validation failed for core '{}'. Some older version segments still remain (likely despite 100% deleted docs).", - coreName); - coreRxStatus = CoreReindexingStatus.ERROR; + "Validation failed for core '{}'. Some data is still present in the older (<{}.x) Lucene index format.", + coreName, + Version.LATEST.major); + response.upgradeStatus = CoreIndexUpgradeStatus.ERROR.toString(); } + + response.core = coreName; + response.upgradeStatus = CoreIndexUpgradeStatus.UPGRADE_SUCCESSFUL.toString(); + response.numSegmentsEligibleForUpgrade = numSegmentsEligibleForUpgrade; + response.numSegmentsUpgraded = numSegmentsUpgraded; } catch (IOException ioEx) { // TO-DO // Throw exception wrapped in CoreAdminAPIBaseException @@ -222,6 +192,7 @@ when the segment (with zero live docs) gets cleared. So adding a validation chec } } } + return response; }); } From 0f10339837e0552ccf36388b2f4bb78d3ae0ced1 Mon Sep 17 00:00:00 2001 From: Rahul Goswami Date: Thu, 1 Jan 2026 18:41:49 -0500 Subject: [PATCH 21/24] Wrap exceptions in CoreAdminAPIBaseException --- .../apache/solr/handler/admin/UpgradeCoreIndexOp.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/solr/core/src/java/org/apache/solr/handler/admin/UpgradeCoreIndexOp.java b/solr/core/src/java/org/apache/solr/handler/admin/UpgradeCoreIndexOp.java index a5b9a9effad..4502624ee82 100644 --- a/solr/core/src/java/org/apache/solr/handler/admin/UpgradeCoreIndexOp.java +++ b/solr/core/src/java/org/apache/solr/handler/admin/UpgradeCoreIndexOp.java @@ -2,6 +2,7 @@ import org.apache.solr.client.api.model.UpgradeCoreIndexRequestBody; import org.apache.solr.client.api.model.UpgradeCoreIndexResponse; +import org.apache.solr.common.SolrException; import org.apache.solr.common.params.CommonAdminParams; import org.apache.solr.common.params.CoreAdminParams; import org.apache.solr.common.params.SolrParams; @@ -32,6 +33,14 @@ public boolean isExpensive() { @Override public void execute(CoreAdminHandler.CallInfo it) throws Exception { + + assert it.handler.coreContainer != null; + if (it.handler.coreContainer.isZooKeeperAware()) { + throw new SolrException( + SolrException.ErrorCode.BAD_REQUEST, + "action=UPRGADECOREINDEX is not supported in SolrCloud mode. As an alternative, in order to upgrade index, configure LatestVersionMergePolicyFactory in solrconfig.xml and reindex the data in your collection."); + } + SolrParams params = it.req.getParams(); String cname = params.required().get(CoreAdminParams.CORE); final boolean isAsync = params.get(CommonAdminParams.ASYNC) != null; From b8b5deb2984038588539f4ba2606440473bd5841 Mon Sep 17 00:00:00 2001 From: Rahul Goswami Date: Thu, 1 Jan 2026 18:59:08 -0500 Subject: [PATCH 22/24] Wrap exceptions in CoreAdminAPIBaseException --- .../handler/admin/api/UpgradeCoreIndex.java | 62 ++++++++++--------- 1 file changed, 32 insertions(+), 30 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java b/solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java index 02bfffc3ae0..6f8f0686106 100644 --- a/solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java +++ b/solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java @@ -22,6 +22,7 @@ import org.apache.lucene.util.Version; import org.apache.solr.client.api.model.UpgradeCoreIndexRequestBody; import org.apache.solr.client.api.model.UpgradeCoreIndexResponse; +import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrInputDocument; import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.params.UpdateParams; @@ -133,30 +134,24 @@ public UpgradeCoreIndexResponse upgradeCoreIndex( if (!shouldUpgradeSegment(lrc)) { continue; } - - boolean success = - processSegment(lrc, updateProcessorChain, core, ssearcherRef.get(), dvICache); - - if (success) { - numSegmentsUpgraded++; - } else { - response.upgradeStatus = CoreIndexUpgradeStatus.ERROR.toString(); - break; - } + processSegment(lrc, updateProcessorChain, core, ssearcherRef.get(), dvICache); + numSegmentsUpgraded++; } } catch (Exception e) { - log.error("Error while processing core: {}", coreName, e); - response.upgradeStatus = CoreIndexUpgradeStatus.ERROR.toString(); + log.error(String.format("Error while processing core: [%s]", coreName), e); + throw new CoreAdminAPIBaseException(e); } finally { // important to decrement searcher ref count after use since we obtained it via the // SolrCore.getSearcher() method ssearcherRef.decref(); } - // TO-DO - // Prepare SolrjerseyResponse if coreRxStatus==ERROR at this point + try { + doCommit(core); + } catch (IOException e) { + throw new CoreAdminAPIBaseException(e); + } - doCommit(core); try { // giving some time for 0 doc segments to clear up Thread.sleep(10000); @@ -171,16 +166,21 @@ public UpgradeCoreIndexResponse upgradeCoreIndex( "Validation failed for core '{}'. Some data is still present in the older (<{}.x) Lucene index format.", coreName, Version.LATEST.major); - response.upgradeStatus = CoreIndexUpgradeStatus.ERROR.toString(); + throw new CoreAdminAPIBaseException( + new SolrException( + SolrException.ErrorCode.SERVER_ERROR, + String.format( + "Validation failed for core '%s'. Some data is still present in the older (<%d.x) Lucene index format.", + coreName, Version.LATEST.major))); } response.core = coreName; response.upgradeStatus = CoreIndexUpgradeStatus.UPGRADE_SUCCESSFUL.toString(); response.numSegmentsEligibleForUpgrade = numSegmentsEligibleForUpgrade; response.numSegmentsUpgraded = numSegmentsUpgraded; - } catch (IOException ioEx) { - // TO-DO - // Throw exception wrapped in CoreAdminAPIBaseException + } catch (Exception ioEx) { + throw new CoreAdminAPIBaseException(ioEx); + } finally { // Restore original merge policy if (iwRef != null) { @@ -224,8 +224,6 @@ private UpdateRequestProcessorChain getUpdateProcessorChain( "UPGRADECOREINDEX:: Requested update chain {} not found for core {}", requestedUpdateChain, core.getName()); - // TO-DO - // Throw exception wrapped in CoreAdminAPIBaseException } } @@ -255,7 +253,7 @@ private UpdateRequestProcessorChain getUpdateProcessorChain( return core.getUpdateProcessingChain(updateChainName); } - private boolean isIndexUpgraded(SolrCore core) { + private boolean isIndexUpgraded(SolrCore core) throws IOException { try (FSDirectory dir = FSDirectory.open(Paths.get(core.getIndexDir())); IndexReader reader = DirectoryReader.open(dir)) { @@ -291,11 +289,12 @@ private boolean isIndexUpgraded(SolrCore core) { "Error while opening segmentInfos for core: {}, exception: {}", core.getName(), e.toString()); + throw e; } - return false; + } - private void doCommit(SolrCore core) { + private void doCommit(SolrCore core) throws IOException { RefCounted iwRef = null; try { iwRef = core.getSolrCoreState().getIndexWriter(null); @@ -312,7 +311,8 @@ private void doCommit(SolrCore core) { } } catch (IOException ioEx) { log.warn( - String.format("Error commiting on core {} during index upgrade", core.getName()), ioEx); + String.format("Error committing on core [%s] during index upgrade", core.getName()), ioEx); + throw ioEx; } finally { if (iwRef != null) { iwRef.decref(); @@ -325,9 +325,10 @@ private boolean processSegment( UpdateRequestProcessorChain processorChain, SolrCore core, SolrIndexSearcher solrIndexSearcher, - DocValuesIteratorCache dvICache) { + DocValuesIteratorCache dvICache) + throws Exception { - boolean success = false; + boolean success; int numDocsProcessed = 0; String coreName = core.getName(); @@ -366,20 +367,21 @@ private boolean processSegment( } success = true; } catch (Exception e) { - log.error("Error in CvReindexingTask process() : {}", e.toString()); - + log.error("Error while processing segment [{}]", segmentReader.getSegmentName()); + throw e; } finally { if (processor != null) { try { processor.finish(); } catch (Exception e) { log.error("Exception while doing finish processor.finish() : {}", e.toString()); - + throw e; } finally { try { processor.close(); } catch (IOException e) { log.error("Exception while closing processor: {}", e.toString()); + throw e; } } } From 983f2d65c503aeee2c9debc3ed6717e0e493ff4e Mon Sep 17 00:00:00 2001 From: Rahul Goswami Date: Thu, 1 Jan 2026 21:36:45 -0500 Subject: [PATCH 23/24] Handle exceptions in processSegment to preserve the main failure --- .../handler/admin/api/UpgradeCoreIndex.java | 64 +++++++++++++------ 1 file changed, 45 insertions(+), 19 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java b/solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java index 6f8f0686106..6155a72286c 100644 --- a/solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java +++ b/solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java @@ -138,7 +138,7 @@ public UpgradeCoreIndexResponse upgradeCoreIndex( numSegmentsUpgraded++; } } catch (Exception e) { - log.error(String.format("Error while processing core: [%s]", coreName), e); + log.error("Error while processing core: [{}}]", coreName, e); throw new CoreAdminAPIBaseException(e); } finally { // important to decrement searcher ref count after use since we obtained it via the @@ -291,7 +291,6 @@ private boolean isIndexUpgraded(SolrCore core) throws IOException { e.toString()); throw e; } - } private void doCommit(SolrCore core) throws IOException { @@ -310,8 +309,7 @@ private void doCommit(SolrCore core) throws IOException { log.warn("IWRef for core {} is null", core.getName()); } } catch (IOException ioEx) { - log.warn( - String.format("Error committing on core [%s] during index upgrade", core.getName()), ioEx); + log.warn("Error committing on core [{}}] during index upgrade", core.getName(), ioEx); throw ioEx; } finally { if (iwRef != null) { @@ -320,7 +318,7 @@ private void doCommit(SolrCore core) throws IOException { } } - private boolean processSegment( + private void processSegment( LeafReaderContext leafReaderContext, UpdateRequestProcessorChain processorChain, SolrCore core, @@ -328,7 +326,7 @@ private boolean processSegment( DocValuesIteratorCache dvICache) throws Exception { - boolean success; + Exception exceptionToThrow = null; int numDocsProcessed = 0; String coreName = core.getName(); @@ -365,28 +363,54 @@ private boolean processSegment( processor.processAdd(currDocCmd); numDocsProcessed++; } - success = true; } catch (Exception e) { - log.error("Error while processing segment [{}]", segmentReader.getSegmentName()); - throw e; + log.error( + "Error while processing segment [{}] in core [{}]", + segmentReader.getSegmentName(), + coreName, + e); + exceptionToThrow = e; } finally { if (processor != null) { try { processor.finish(); } catch (Exception e) { - log.error("Exception while doing finish processor.finish() : {}", e.toString()); - throw e; - } finally { - try { - processor.close(); - } catch (IOException e) { - log.error("Exception while closing processor: {}", e.toString()); - throw e; + log.error( + "Exception during processor.finish() for segment [{}] in core [{}]", + segmentReader.getSegmentName(), + coreName, + e); + if (exceptionToThrow == null) { + exceptionToThrow = e; + } else { + exceptionToThrow.addSuppressed(e); + } + } + try { + processor.close(); + } catch (Exception e) { + log.error( + "Exception while closing update processor for segment [{}] in core [{}]", + segmentReader.getSegmentName(), + coreName, + e); + if (exceptionToThrow == null) { + exceptionToThrow = e; + } else { + exceptionToThrow.addSuppressed(e); } } } if (solrRequest != null) { - solrRequest.close(); + try { + solrRequest.close(); + } catch (Exception e) { + if (exceptionToThrow == null) { + exceptionToThrow = e; + } else { + exceptionToThrow.addSuppressed(e); + } + } } } @@ -396,7 +420,9 @@ private boolean processSegment( coreName, numDocsProcessed); - return success; + if (exceptionToThrow != null) { + throw exceptionToThrow; + } } /* From d00571204816dfc58aa37524e1a7890237daa492 Mon Sep 17 00:00:00 2001 From: Rahul Goswami Date: Fri, 2 Jan 2026 03:15:18 -0500 Subject: [PATCH 24/24] keeping validateLogCalls and forbidden apis check happy --- .../handler/admin/api/UpgradeCoreIndex.java | 34 ++++++++----------- 1 file changed, 15 insertions(+), 19 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java b/solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java index 6155a72286c..765c74b273b 100644 --- a/solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java +++ b/solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java @@ -2,7 +2,7 @@ import java.io.IOException; import java.lang.invoke.MethodHandles; -import java.nio.file.Paths; +import java.nio.file.Path; import java.util.List; import java.util.Set; import org.apache.lucene.document.Document; @@ -87,7 +87,7 @@ public UpgradeCoreIndexResponse upgradeCoreIndex( () -> { try (SolrCore core = coreContainer.getCore(coreName)) { - log.info("Received UPGRADECOREINDEX request for core: {}", core.getName()); + log.info("Received UPGRADECOREINDEX request for core: {}", coreName); // Set LatestVersionMergePolicy to prevent older segments from // participating in merges while we reindex. This is to prevent any older version @@ -169,9 +169,11 @@ public UpgradeCoreIndexResponse upgradeCoreIndex( throw new CoreAdminAPIBaseException( new SolrException( SolrException.ErrorCode.SERVER_ERROR, - String.format( - "Validation failed for core '%s'. Some data is still present in the older (<%d.x) Lucene index format.", - coreName, Version.LATEST.major))); + "Validation failed for core '" + + coreName + + "'. Some data is still present in the older (<" + + Version.LATEST.major + + ".x) Lucene index format.")); } response.core = coreName; @@ -255,7 +257,7 @@ private UpdateRequestProcessorChain getUpdateProcessorChain( private boolean isIndexUpgraded(SolrCore core) throws IOException { - try (FSDirectory dir = FSDirectory.open(Paths.get(core.getIndexDir())); + try (FSDirectory dir = FSDirectory.open(Path.of(core.getIndexDir())); IndexReader reader = DirectoryReader.open(dir)) { List leaves = reader.leaves(); @@ -285,10 +287,7 @@ private boolean isIndexUpgraded(SolrCore core) throws IOException { } return true; } catch (Exception e) { - log.error( - "Error while opening segmentInfos for core: {}, exception: {}", - core.getName(), - e.toString()); + log.error("Error while opening segmentInfos for core [{}]", core.getName(), e); throw e; } } @@ -309,7 +308,7 @@ private void doCommit(SolrCore core) throws IOException { log.warn("IWRef for core {} is null", core.getName()); } } catch (IOException ioEx) { - log.warn("Error committing on core [{}}] during index upgrade", core.getName(), ioEx); + log.warn("Error committing on core [{}] during index upgrade", core.getName(), ioEx); throw ioEx; } finally { if (iwRef != null) { @@ -334,6 +333,7 @@ private void processSegment( LeafReader leafReader = FilterLeafReader.unwrap(leafReaderContext.reader()); SegmentReader segmentReader = (SegmentReader) leafReader; + final String segmentName = segmentReader.getSegmentName(); Bits bits = segmentReader.getLiveDocs(); SolrInputDocument solrDoc = null; UpdateRequestProcessor processor = null; @@ -364,11 +364,7 @@ private void processSegment( numDocsProcessed++; } } catch (Exception e) { - log.error( - "Error while processing segment [{}] in core [{}]", - segmentReader.getSegmentName(), - coreName, - e); + log.error("Error while processing segment [{}] in core [{}]", segmentName, coreName, e); exceptionToThrow = e; } finally { if (processor != null) { @@ -377,7 +373,7 @@ private void processSegment( } catch (Exception e) { log.error( "Exception during processor.finish() for segment [{}] in core [{}]", - segmentReader.getSegmentName(), + segmentName, coreName, e); if (exceptionToThrow == null) { @@ -391,7 +387,7 @@ private void processSegment( } catch (Exception e) { log.error( "Exception while closing update processor for segment [{}] in core [{}]", - segmentReader.getSegmentName(), + segmentName, coreName, e); if (exceptionToThrow == null) { @@ -416,7 +412,7 @@ private void processSegment( log.info( "End processing segment : {}, core: {} docs processed: {}", - segmentReader.getSegmentName(), + segmentName, coreName, numDocsProcessed);