diff --git a/changelog/unreleased/SOLR-17725-CoreAdmin-API-to-upgrade-an-index-in-place.yml b/changelog/unreleased/SOLR-17725-CoreAdmin-API-to-upgrade-an-index-in-place.yml new file mode 100644 index 000000000000..0ac334f8b566 --- /dev/null +++ b/changelog/unreleased/SOLR-17725-CoreAdmin-API-to-upgrade-an-index-in-place.yml @@ -0,0 +1,9 @@ +# See https://github.com/apache/solr/blob/main/dev-docs/changelog.adoc +title: CoreAdmin API (/admin/cores?action=UPGRADECOREINDEX) to upgrade an index in-place +type: added +authors: + - name: Rahul Goswami +links: + - name: SOLR-17725 + url: https://issues.apache.org/jira/browse/SOLR-17725 + diff --git a/solr/api/src/java/org/apache/solr/client/api/model/UpgradeCoreIndexRequestBody.java b/solr/api/src/java/org/apache/solr/client/api/model/UpgradeCoreIndexRequestBody.java new file mode 100644 index 000000000000..0635db57fcdb --- /dev/null +++ b/solr/api/src/java/org/apache/solr/client/api/model/UpgradeCoreIndexRequestBody.java @@ -0,0 +1,13 @@ +package org.apache.solr.client.api.model; + +import com.fasterxml.jackson.annotation.JsonProperty; +import io.swagger.v3.oas.annotations.media.Schema; + +public class UpgradeCoreIndexRequestBody { + + @Schema(description = "Request ID to track this action which will be processed asynchronously.") + @JsonProperty + public String async; + + @JsonProperty public String updateChain; +} diff --git a/solr/api/src/java/org/apache/solr/client/api/model/UpgradeCoreIndexResponse.java b/solr/api/src/java/org/apache/solr/client/api/model/UpgradeCoreIndexResponse.java new file mode 100644 index 000000000000..f96d6a541732 --- /dev/null +++ b/solr/api/src/java/org/apache/solr/client/api/model/UpgradeCoreIndexResponse.java @@ -0,0 +1,22 @@ +package org.apache.solr.client.api.model; + +import com.fasterxml.jackson.annotation.JsonProperty; +import io.swagger.v3.oas.annotations.media.Schema; + +public class UpgradeCoreIndexResponse extends SolrJerseyResponse { + @Schema(description = "The name of the core.") + @JsonProperty + public String core; + + @Schema(description = "The total number of segments eligible for upgrade.") + @JsonProperty + public Integer numSegmentsEligibleForUpgrade; + + @Schema(description = "The number of segments successfully upgraded.") + @JsonProperty + public Integer numSegmentsUpgraded; + + @Schema(description = "Status of the core index upgrade operation.") + @JsonProperty + public String upgradeStatus; +} diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminOperation.java b/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminOperation.java index 0a15462f13fe..3cd2bb141e73 100644 --- a/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminOperation.java +++ b/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminOperation.java @@ -38,6 +38,7 @@ import static org.apache.solr.common.params.CoreAdminParams.CoreAdminAction.STATUS; import static org.apache.solr.common.params.CoreAdminParams.CoreAdminAction.SWAP; import static org.apache.solr.common.params.CoreAdminParams.CoreAdminAction.UNLOAD; +import static org.apache.solr.common.params.CoreAdminParams.CoreAdminAction.UPGRADECOREINDEX; import static org.apache.solr.handler.admin.CoreAdminHandler.CallInfo; import java.lang.invoke.MethodHandles; @@ -256,7 +257,8 @@ public enum CoreAdminOperation implements CoreAdminOp { final ListCoreSnapshotsResponse response = coreSnapshotAPI.listSnapshots(coreName); V2ApiUtils.squashIntoSolrResponseWithoutHeader(it.rsp, response); - }); + }), + UPGRADECOREINDEX_OP(UPGRADECOREINDEX, new UpgradeCoreIndexOp()); final CoreAdminParams.CoreAdminAction action; final CoreAdminOp fun; diff --git a/solr/core/src/java/org/apache/solr/handler/admin/UpgradeCoreIndexOp.java b/solr/core/src/java/org/apache/solr/handler/admin/UpgradeCoreIndexOp.java new file mode 100644 index 000000000000..4502624ee824 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/handler/admin/UpgradeCoreIndexOp.java @@ -0,0 +1,66 @@ +package org.apache.solr.handler.admin; + +import org.apache.solr.client.api.model.UpgradeCoreIndexRequestBody; +import org.apache.solr.client.api.model.UpgradeCoreIndexResponse; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.params.CommonAdminParams; +import org.apache.solr.common.params.CoreAdminParams; +import org.apache.solr.common.params.SolrParams; +import org.apache.solr.common.params.UpdateParams; +import org.apache.solr.common.util.NamedList; +import org.apache.solr.core.CoreContainer; +import org.apache.solr.handler.admin.api.UpgradeCoreIndex; +import org.apache.solr.handler.api.V2ApiUtils; +import org.apache.solr.request.SolrQueryRequest; +import org.apache.solr.response.SolrQueryResponse; + +public class UpgradeCoreIndexOp implements CoreAdminHandler.CoreAdminOp { + @FunctionalInterface + public interface UpgradeCoreIndexFactory { + UpgradeCoreIndex create( + CoreContainer coreContainer, + CoreAdminHandler.CoreAdminAsyncTracker coreAdminAsyncTracker, + SolrQueryRequest req, + SolrQueryResponse rsp); + } + + static UpgradeCoreIndexFactory UPGRADE_CORE_INDEX_FACTORY = UpgradeCoreIndex::new; + + @Override + public boolean isExpensive() { + return true; + } + + @Override + public void execute(CoreAdminHandler.CallInfo it) throws Exception { + + assert it.handler.coreContainer != null; + if (it.handler.coreContainer.isZooKeeperAware()) { + throw new SolrException( + SolrException.ErrorCode.BAD_REQUEST, + "action=UPRGADECOREINDEX is not supported in SolrCloud mode. As an alternative, in order to upgrade index, configure LatestVersionMergePolicyFactory in solrconfig.xml and reindex the data in your collection."); + } + + SolrParams params = it.req.getParams(); + String cname = params.required().get(CoreAdminParams.CORE); + final boolean isAsync = params.get(CommonAdminParams.ASYNC) != null; + final var requestBody = new UpgradeCoreIndexRequestBody(); + requestBody.updateChain = params.get(UpdateParams.UPDATE_CHAIN); + + UpgradeCoreIndex upgradeCoreIndexApi = + UPGRADE_CORE_INDEX_FACTORY.create( + it.handler.coreContainer, it.handler.coreAdminAsyncTracker, it.req, it.rsp); + final UpgradeCoreIndexResponse response = + upgradeCoreIndexApi.upgradeCoreIndex(cname, requestBody); + V2ApiUtils.squashIntoSolrResponseWithoutHeader(it.rsp, response); + + if (isAsync) { + final var opResponse = new NamedList<>(); + V2ApiUtils.squashIntoNamedListWithoutHeader(opResponse, response); + // REQUESTSTATUS is returning the inner response NamedList as a positional array + // ([k1,v1,k2,v2...]). + // so converting to a map + it.rsp.addResponse(opResponse.asMap(1)); + } + } +} diff --git a/solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java b/solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java new file mode 100644 index 000000000000..765c74b273b4 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/handler/admin/api/UpgradeCoreIndex.java @@ -0,0 +1,455 @@ +package org.apache.solr.handler.admin.api; + +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.nio.file.Path; +import java.util.List; +import java.util.Set; +import org.apache.lucene.document.Document; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.FilterLeafReader; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexableField; +import org.apache.lucene.index.LeafReader; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.MergePolicy; +import org.apache.lucene.index.SegmentCommitInfo; +import org.apache.lucene.index.SegmentReader; +import org.apache.lucene.index.StoredFields; +import org.apache.lucene.store.FSDirectory; +import org.apache.lucene.util.Bits; +import org.apache.lucene.util.Version; +import org.apache.solr.client.api.model.UpgradeCoreIndexRequestBody; +import org.apache.solr.client.api.model.UpgradeCoreIndexResponse; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.common.params.UpdateParams; +import org.apache.solr.common.util.NamedList; +import org.apache.solr.core.CoreContainer; +import org.apache.solr.core.SolrCore; +import org.apache.solr.handler.RequestHandlerBase; +import org.apache.solr.handler.admin.CoreAdminHandler; +import org.apache.solr.index.LatestVersionMergePolicy; +import org.apache.solr.request.LocalSolrQueryRequest; +import org.apache.solr.request.SolrQueryRequest; +import org.apache.solr.request.SolrRequestHandler; +import org.apache.solr.response.SolrQueryResponse; +import org.apache.solr.schema.IndexSchema; +import org.apache.solr.schema.SchemaField; +import org.apache.solr.search.DocValuesIteratorCache; +import org.apache.solr.search.SolrDocumentFetcher; +import org.apache.solr.search.SolrIndexSearcher; +import org.apache.solr.update.AddUpdateCommand; +import org.apache.solr.update.processor.UpdateRequestProcessor; +import org.apache.solr.update.processor.UpdateRequestProcessorChain; +import org.apache.solr.util.RefCounted; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class UpgradeCoreIndex extends CoreAdminAPIBase { + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + public enum CoreIndexUpgradeStatus { + UPGRADE_SUCCESSFUL, + ERROR, + NO_UPGRADE_NEEDED; + } + + private static final int RETRY_COUNT_FOR_SEGMENT_DELETION = 5; + + public UpgradeCoreIndex( + CoreContainer coreContainer, + CoreAdminHandler.CoreAdminAsyncTracker coreAdminAsyncTracker, + SolrQueryRequest req, + SolrQueryResponse rsp) { + super(coreContainer, coreAdminAsyncTracker, req, rsp); + } + + @Override + public boolean isExpensive() { + return true; + } + + public UpgradeCoreIndexResponse upgradeCoreIndex( + String coreName, UpgradeCoreIndexRequestBody requestBody) throws Exception { + ensureRequiredParameterProvided("coreName", coreName); + + final UpgradeCoreIndexResponse response = + instantiateJerseyResponse(UpgradeCoreIndexResponse.class); + + return handlePotentiallyAsynchronousTask( + response, + coreName, + requestBody.async, + "upgrade-index", + () -> { + try (SolrCore core = coreContainer.getCore(coreName)) { + + log.info("Received UPGRADECOREINDEX request for core: {}", coreName); + + // Set LatestVersionMergePolicy to prevent older segments from + // participating in merges while we reindex. This is to prevent any older version + // segments from + // merging with any newly formed segments created due to reindexing and undoing the work + // we are doing. + RefCounted iwRef = null; + MergePolicy originalMergePolicy = null; + int numSegmentsEligibleForUpgrade = 0, numSegmentsUpgraded = 0; + try { + iwRef = core.getSolrCoreState().getIndexWriter(core); + if (iwRef != null) { + IndexWriter iw = iwRef.get(); + if (iw != null) { + originalMergePolicy = iw.getConfig().getMergePolicy(); + iw.getConfig() + .setMergePolicy( + new LatestVersionMergePolicy(iw.getConfig().getMergePolicy())); + } + } + + RefCounted ssearcherRef = core.getSearcher(); + try { + List leafContexts = + ssearcherRef.get().getTopReaderContext().leaves(); + DocValuesIteratorCache dvICache = new DocValuesIteratorCache(ssearcherRef.get()); + + UpdateRequestProcessorChain updateProcessorChain = + getUpdateProcessorChain(core, requestBody.updateChain); + + for (LeafReaderContext lrc : leafContexts) { + if (shouldUpgradeSegment(lrc)) { + numSegmentsEligibleForUpgrade++; + } + } + if (numSegmentsEligibleForUpgrade == 0) { + response.core = coreName; + response.upgradeStatus = CoreIndexUpgradeStatus.NO_UPGRADE_NEEDED.toString(); + response.numSegmentsEligibleForUpgrade = 0; + return response; + } + + for (LeafReaderContext lrc : leafContexts) { + if (!shouldUpgradeSegment(lrc)) { + continue; + } + processSegment(lrc, updateProcessorChain, core, ssearcherRef.get(), dvICache); + numSegmentsUpgraded++; + } + } catch (Exception e) { + log.error("Error while processing core: [{}}]", coreName, e); + throw new CoreAdminAPIBaseException(e); + } finally { + // important to decrement searcher ref count after use since we obtained it via the + // SolrCore.getSearcher() method + ssearcherRef.decref(); + } + + try { + doCommit(core); + } catch (IOException e) { + throw new CoreAdminAPIBaseException(e); + } + + try { + // giving some time for 0 doc segments to clear up + Thread.sleep(10000); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + + boolean indexUpgraded = isIndexUpgraded(core); + + if (!indexUpgraded) { + log.error( + "Validation failed for core '{}'. Some data is still present in the older (<{}.x) Lucene index format.", + coreName, + Version.LATEST.major); + throw new CoreAdminAPIBaseException( + new SolrException( + SolrException.ErrorCode.SERVER_ERROR, + "Validation failed for core '" + + coreName + + "'. Some data is still present in the older (<" + + Version.LATEST.major + + ".x) Lucene index format.")); + } + + response.core = coreName; + response.upgradeStatus = CoreIndexUpgradeStatus.UPGRADE_SUCCESSFUL.toString(); + response.numSegmentsEligibleForUpgrade = numSegmentsEligibleForUpgrade; + response.numSegmentsUpgraded = numSegmentsUpgraded; + } catch (Exception ioEx) { + throw new CoreAdminAPIBaseException(ioEx); + + } finally { + // Restore original merge policy + if (iwRef != null) { + IndexWriter iw = iwRef.get(); + if (iw != null && originalMergePolicy != null) { + iw.getConfig().setMergePolicy(originalMergePolicy); + } + iwRef.decref(); + } + } + } + + return response; + }); + } + + private boolean shouldUpgradeSegment(LeafReaderContext lrc) { + Version segmentMinVersion = null; + + LeafReader leafReader = lrc.reader(); + leafReader = FilterLeafReader.unwrap(leafReader); + + SegmentCommitInfo si = ((SegmentReader) leafReader).getSegmentInfo(); + segmentMinVersion = si.info.getMinVersion(); + + return (segmentMinVersion == null || segmentMinVersion.major < Version.LATEST.major); + } + + @SuppressWarnings({"rawtypes"}) + private UpdateRequestProcessorChain getUpdateProcessorChain( + SolrCore core, String requestedUpdateChain) { + + // Try explicitly requested chain first + if (requestedUpdateChain != null) { + UpdateRequestProcessorChain resolvedChain = + core.getUpdateProcessingChain(requestedUpdateChain); + if (resolvedChain != null) { + return resolvedChain; + } else { + log.error( + "UPGRADECOREINDEX:: Requested update chain {} not found for core {}", + requestedUpdateChain, + core.getName()); + } + } + + // Try to find chain configured in /update handler + String updateChainName = null; + SolrRequestHandler reqHandler = core.getRequestHandler("/update"); + + NamedList initArgs = ((RequestHandlerBase) reqHandler).getInitArgs(); + + if (initArgs != null) { + // Check invariants first + Object invariants = initArgs.get("invariants"); + if (invariants instanceof NamedList) { + updateChainName = (String) ((NamedList) invariants).get(UpdateParams.UPDATE_CHAIN); + } + + // Check defaults if not found in invariants + if (updateChainName == null) { + Object defaults = initArgs.get("defaults"); + if (defaults instanceof NamedList) { + updateChainName = (String) ((NamedList) defaults).get(UpdateParams.UPDATE_CHAIN); + } + } + } + + // default chain is returned if updateChainName is null + return core.getUpdateProcessingChain(updateChainName); + } + + private boolean isIndexUpgraded(SolrCore core) throws IOException { + + try (FSDirectory dir = FSDirectory.open(Path.of(core.getIndexDir())); + IndexReader reader = DirectoryReader.open(dir)) { + + List leaves = reader.leaves(); + if (leaves == null || leaves.isEmpty()) { + // no segments to process/validate + return true; + } + + for (LeafReaderContext lrc : leaves) { + LeafReader leafReader = lrc.reader(); + leafReader = FilterLeafReader.unwrap(leafReader); + if (leafReader instanceof SegmentReader) { + SegmentReader segmentReader = (SegmentReader) leafReader; + SegmentCommitInfo si = segmentReader.getSegmentInfo(); + Version segMinVersion = si.info.getMinVersion(); + if (segMinVersion == null || segMinVersion.major != Version.LATEST.major) { + log.warn( + "isIndexUpgraded(): Core: {}, Segment [{}] is still at minVersion [{}] and is not updated to the latest version [{}]; numLiveDocs: [{}]", + core.getName(), + si.info.name, + (segMinVersion == null ? 6 : segMinVersion.major), + Version.LATEST.major, + segmentReader.numDocs()); + return false; + } + } + } + return true; + } catch (Exception e) { + log.error("Error while opening segmentInfos for core [{}]", core.getName(), e); + throw e; + } + } + + private void doCommit(SolrCore core) throws IOException { + RefCounted iwRef = null; + try { + iwRef = core.getSolrCoreState().getIndexWriter(null); + if (iwRef != null) { + IndexWriter iw = iwRef.get(); + + if (iw != null) { + iw.commit(); + } else { + log.warn("IndexWriter for core {} is null", core.getName()); + } + } else { + log.warn("IWRef for core {} is null", core.getName()); + } + } catch (IOException ioEx) { + log.warn("Error committing on core [{}] during index upgrade", core.getName(), ioEx); + throw ioEx; + } finally { + if (iwRef != null) { + iwRef.decref(); + } + } + } + + private void processSegment( + LeafReaderContext leafReaderContext, + UpdateRequestProcessorChain processorChain, + SolrCore core, + SolrIndexSearcher solrIndexSearcher, + DocValuesIteratorCache dvICache) + throws Exception { + + Exception exceptionToThrow = null; + int numDocsProcessed = 0; + + String coreName = core.getName(); + IndexSchema indexSchema = core.getLatestSchema(); + + LeafReader leafReader = FilterLeafReader.unwrap(leafReaderContext.reader()); + SegmentReader segmentReader = (SegmentReader) leafReader; + final String segmentName = segmentReader.getSegmentName(); + Bits bits = segmentReader.getLiveDocs(); + SolrInputDocument solrDoc = null; + UpdateRequestProcessor processor = null; + LocalSolrQueryRequest solrRequest = null; + SolrDocumentFetcher docFetcher = solrIndexSearcher.getDocFetcher(); + try { + // Exclude copy field targets to avoid duplicating values on reindex + Set nonStoredDVFields = docFetcher.getNonStoredDVsWithoutCopyTargets(); + solrRequest = new LocalSolrQueryRequest(core, new ModifiableSolrParams()); + + SolrQueryResponse rsp = new SolrQueryResponse(); + processor = processorChain.createProcessor(solrRequest, rsp); + StoredFields storedFields = segmentReader.storedFields(); + for (int luceneDocId = 0; luceneDocId < segmentReader.maxDoc(); luceneDocId++) { + if (bits != null && !bits.get(luceneDocId)) { + continue; + } + + Document doc = storedFields.document(luceneDocId); + solrDoc = toSolrInputDocument(doc, indexSchema); + + docFetcher.decorateDocValueFields( + solrDoc, leafReaderContext.docBase + luceneDocId, nonStoredDVFields, dvICache); + solrDoc.removeField("_version_"); + AddUpdateCommand currDocCmd = new AddUpdateCommand(solrRequest); + currDocCmd.solrDoc = solrDoc; + processor.processAdd(currDocCmd); + numDocsProcessed++; + } + } catch (Exception e) { + log.error("Error while processing segment [{}] in core [{}]", segmentName, coreName, e); + exceptionToThrow = e; + } finally { + if (processor != null) { + try { + processor.finish(); + } catch (Exception e) { + log.error( + "Exception during processor.finish() for segment [{}] in core [{}]", + segmentName, + coreName, + e); + if (exceptionToThrow == null) { + exceptionToThrow = e; + } else { + exceptionToThrow.addSuppressed(e); + } + } + try { + processor.close(); + } catch (Exception e) { + log.error( + "Exception while closing update processor for segment [{}] in core [{}]", + segmentName, + coreName, + e); + if (exceptionToThrow == null) { + exceptionToThrow = e; + } else { + exceptionToThrow.addSuppressed(e); + } + } + } + if (solrRequest != null) { + try { + solrRequest.close(); + } catch (Exception e) { + if (exceptionToThrow == null) { + exceptionToThrow = e; + } else { + exceptionToThrow.addSuppressed(e); + } + } + } + } + + log.info( + "End processing segment : {}, core: {} docs processed: {}", + segmentName, + coreName, + numDocsProcessed); + + if (exceptionToThrow != null) { + throw exceptionToThrow; + } + } + + /* + * Convert a lucene Document to a SolrInputDocument + */ + protected SolrInputDocument toSolrInputDocument( + org.apache.lucene.document.Document doc, IndexSchema schema) { + SolrInputDocument out = new SolrInputDocument(); + for (IndexableField f : doc.getFields()) { + String fname = f.name(); + SchemaField sf = schema.getFieldOrNull(f.name()); + Object val = null; + if (sf != null) { + if ((!sf.hasDocValues() && !sf.stored()) || schema.isCopyFieldTarget(sf)) { + continue; + } + val = sf.getType().toObject(f); + } else { + val = f.stringValue(); + if (val == null) { + val = f.numericValue(); + } + if (val == null) { + val = f.binaryValue(); + } + if (val == null) { + val = f; + } + } + out.addField(fname, val); + } + return out; + } +} diff --git a/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java b/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java index ea2206421240..ad41867cc31b 100644 --- a/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java +++ b/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java @@ -178,7 +178,8 @@ public enum CoreAdminAction { INSTALLCOREDATA, CREATESNAPSHOT, DELETESNAPSHOT, - LISTSNAPSHOTS; + LISTSNAPSHOTS, + UPGRADECOREINDEX; public final boolean isRead;