33import java .io .IOException ;
44import java .lang .invoke .MethodHandles ;
55import java .nio .file .Paths ;
6- import java .time .Duration ;
76import java .time .LocalDateTime ;
87import java .time .ZoneOffset ;
98import java .time .format .DateTimeFormatter ;
1413import java .util .Map ;
1514import java .util .Set ;
1615import java .util .UUID ;
17- import org .apache .commons .lang3 .time .DurationFormatUtils ;
1816import org .apache .lucene .document .Document ;
1917import org .apache .lucene .index .DirectoryReader ;
2018import org .apache .lucene .index .FilterLeafReader ;
@@ -113,8 +111,6 @@ public enum ReindexingThreadState {
113111 private static final DateTimeFormatter formatter =
114112 DateTimeFormatter .ofPattern ("dd-MM-yyyy HH:mm:ss" );
115113
116- private SolrInputDocument lastDoc ;
117-
118114 public UpgradeCoreIndex (
119115 CoreContainer coreContainer ,
120116 CoreAdminHandler .CoreAdminAsyncTracker coreAdminAsyncTracker ,
@@ -141,99 +137,42 @@ public SolrJerseyResponse upgradeCoreIndex(
141137 () -> {
142138 try (SolrCore core = coreContainer .getCore (coreName )) {
143139
144- log .warn ( "Processing core: {}" , core .getName ());
140+ log .info ( "Received UPGRADECOREINDEX request for core: {}" , core .getName ());
145141 CoreReindexingStatus coreRxStatus = CoreReindexingStatus .REINDEXING_ACTIVE ;
146-
147142 String indexDir = core .getIndexDir ();
148-
149- log .info ("Starting to process core: {}" , coreName );
150-
151143 RefCounted <SolrIndexSearcher > ssearcherRef = core .getSearcher ();
152144 List <LeafReaderContext > leafContexts =
153145 ssearcherRef .get ().getTopReaderContext ().leaves ();
154146 DocValuesIteratorCache dvICache = new DocValuesIteratorCache (ssearcherRef .get ());
155147
156- Map <String , Long > segmentsToUpgrade = getSegmentsToUpgrade (indexDir );
157-
158- log .info ("Segments to upgrade: {}" , segmentsToUpgrade .toString ());
159-
160- setLastDoc (null );
161-
162148 UpdateRequestProcessorChain updateProcessorChain =
163149 getUpdateProcessorChain (core , requestBody .updateChain );
164150
165151 try {
166152
167- for (int segmentIndex = 0 , c = leafContexts .size ();
168- segmentIndex < c ;
169- segmentIndex ++) {
170- LeafReaderContext lrc = leafContexts .get (segmentIndex );
171- LeafReader leafReader = lrc .reader ();
172- leafReader = FilterLeafReader .unwrap (leafReader );
173- log .debug (
174- "LeafReader hashcode: {}, getCreatedVersionMajor: {}, getMinVersion:{} " ,
175- leafReader .hashCode (),
176- leafReader .getMetaData ().createdVersionMajor (),
177- leafReader .getMetaData ().minVersion ());
178-
179- SegmentReader segmentReader = (SegmentReader ) leafReader ;
180- String currentSegmentName = segmentReader .getSegmentName ();
181-
182- if (segmentsToUpgrade .containsKey (currentSegmentName )) {
183- boolean segmentError = true ;
184- LocalDateTime segmentRxStartTime = LocalDateTime .now ();
185- LocalDateTime segmentRxStopTime = LocalDateTime .MAX ;
186-
187- for (int i = 0 ; i < SEGMENT_ERROR_RETRIES ; i ++) {
188- // retrying segment; I anticipate throttling to be the main reason in most
189- // cases
190- // hence the sleep
191- if (i > 0 ) {
192- Thread .sleep (5 * 60 * 1000 ); // 5 minutes
193- }
153+ for (LeafReaderContext lrc : leafContexts ) {
154+ if (!shouldUpgradeSegment (lrc )) {
155+ continue ;
156+ }
194157
195- log .info (
196- "Start processSegment run: {}, segment: {} at {}" ,
197- i ,
198- segmentReader .getSegmentName (),
199- formatter .format (segmentRxStartTime ));
200-
201- segmentError =
202- processSegment (
203- segmentReader ,
204- leafContexts ,
205- segmentIndex ,
206- updateProcessorChain ,
207- core ,
208- dvICache );
209-
210- segmentRxStopTime = LocalDateTime .now ();
211- log .info (
212- "End processSegment run: {}, segment: {} at {}" ,
213- i ,
214- segmentReader .getSegmentName (),
215- formatter .format (segmentRxStopTime ));
216- if (!segmentError ) {
217- break ;
218- }
219- }
220- if (segmentError ) {
221- coreRxStatus = CoreReindexingStatus .ERROR ;
222- log .error (
223- "processSegment returned : {} for segment : {}" ,
224- segmentError ,
225- segmentReader .getSegmentName ());
158+ boolean segmentError = true ;
159+
160+ for (int i = 0 ; i < SEGMENT_ERROR_RETRIES ; i ++) {
161+ // retrying segment; I anticipate throttling to be the main reason in most
162+ // cases
163+ // hence the sleep
164+ if (i > 0 ) {
165+ Thread .sleep (5 * 60 * 1000 ); // 5 minutes
226166 }
227167
228- log .info (
229- "Segment: {} Elapsed time: {}, start time: {}, stop time: {}" ,
230- segmentReader .getSegmentName (),
231- DurationFormatUtils .formatDuration (
232- Duration .between (segmentRxStartTime , segmentRxStopTime ).toMillis (),
233- "**H:mm:ss**" ,
234- true ),
235- formatter .format (segmentRxStartTime ),
236- formatter .format (segmentRxStopTime ));
168+ segmentError = processSegment (lrc , updateProcessorChain , core , dvICache );
169+
170+ if (!segmentError ) {
171+ break ;
172+ }
173+ }
174+ if (segmentError ) {
175+ coreRxStatus = CoreReindexingStatus .ERROR ;
237176 }
238177 }
239178 } catch (Exception e ) {
@@ -333,6 +272,17 @@ public SolrJerseyResponse upgradeCoreIndex(
333272 });
334273 }
335274
275+ private boolean shouldUpgradeSegment (LeafReaderContext lrc ) {
276+ Version segmentMinVersion = null ;
277+ try (LeafReader leafReader = lrc .reader ()) {
278+ segmentMinVersion = leafReader .getMetaData ().minVersion ();
279+ } catch (IOException ex ) {
280+ // TO-DO
281+ // Wrap exception in CoreAdminAPIBaseException
282+ }
283+ return (segmentMinVersion == null || segmentMinVersion .major < Version .LATEST .major );
284+ }
285+
336286 private int getIndexCreatedVersionMajor (SolrCore core ) {
337287 int indexCreatedVersionMajor = 0 ;
338288 try (FSDirectory dir = FSDirectory .open (Paths .get (core .getIndexDir ()))) {
@@ -437,38 +387,15 @@ private Boolean validateSegmentsUpdated(SolrCore core) {
437387
438388 private void doCommit (SolrCore core ) {
439389 try {
440- SolrInputDocument dummyDoc = null ;
441- SolrInputDocument lastDoc = getLastDoc ();
442- if (lastDoc == null ) {
443- // set dummy doc for commit to take effect especially in case of 0-doc cores
444- dummyDoc = getDummyDoc (core );
445- lastDoc = dummyDoc ;
446- }
447-
448390 UpdateRequest updateReq = new UpdateRequest ();
449- updateReq .add (lastDoc );
450- if (log .isDebugEnabled ()) {
451- log .debug ("Last solr Doc keySet: {}" , lastDoc .keySet ().toString ());
452- }
391+
453392 ModifiableSolrParams msp = new ModifiableSolrParams ();
454393
455394 msp .add ("commit" , "true" );
456395 LocalSolrQueryRequest solrReq ;
457396 solrReq = getLocalUpdateReq (updateReq , core , msp );
458- updateReq .getDocumentsMap ().clear ();
459- if (log .isDebugEnabled ()) {
460- log .debug (
461- "Calling commit. Solr params: {}, CvReindexingTask.getLastDoc(): {}" ,
462- msp .toString (),
463- lastDoc .toString ());
464- }
465397 doLocalUpdateReq (solrReq , core );
466398
467- if (dummyDoc != null ) {
468- deleteDummyDocAndCommit (
469- core ,
470- (String ) dummyDoc .getFieldValue (core .getLatestSchema ().getUniqueKeyField ().getName ()));
471- }
472399 } catch (Exception e ) {
473400 log .error (
474401 "Error while sending update request to advance index created version {}" , e .toString ());
@@ -533,10 +460,6 @@ private SolrInputDocument getDummyDoc(SolrCore core) {
533460 return dummyDoc ;
534461 }
535462
536- private void setLastDoc (SolrInputDocument solrDoc ) {
537- lastDoc = solrDoc ;
538- }
539-
540463 private static Map <String , Long > getSegmentsToUpgrade (String indexDir ) {
541464 Map <String , Long > segmentsToUpgrade = new LinkedHashMap <>();
542465 try (Directory dir = FSDirectory .open (Paths .get (indexDir ));
@@ -564,18 +487,19 @@ private static Map<String, Long> getSegmentsToUpgrade(String indexDir) {
564487 }
565488
566489 private boolean processSegment (
567- SegmentReader segmentReader ,
568- List <LeafReaderContext > leafContexts ,
569- int segmentIndex ,
490+ LeafReaderContext leafReaderContext ,
570491 UpdateRequestProcessorChain processorChain ,
571492 SolrCore core ,
572493 DocValuesIteratorCache dvICache ) {
573494
574495 boolean segmentError = false ;
575496 int numDocsProcessed = 0 ;
576- int numDocsAccum = 0 ;
497+
577498 String coreName = core .getName ();
578499 IndexSchema indexSchema = core .getLatestSchema ();
500+
501+ LeafReader leafReader = FilterLeafReader .unwrap (leafReaderContext .reader ());
502+ SegmentReader segmentReader = (SegmentReader ) leafReader ;
579503 Bits bits = segmentReader .getLiveDocs ();
580504 SolrInputDocument solrDoc = null ;
581505 UpdateRequestProcessor processor = null ;
@@ -599,13 +523,12 @@ private boolean processSegment(
599523 solrDoc = toSolrInputDocument (doc , indexSchema );
600524
601525 docFetcher .decorateDocValueFields (
602- solrDoc , leafContexts . get ( segmentIndex ) .docBase + luceneDocId , fields , dvICache );
526+ solrDoc , leafReaderContext .docBase + luceneDocId , fields , dvICache );
603527 solrDoc .removeField ("_version_" );
604528 AddUpdateCommand currDocCmd = new AddUpdateCommand (solrRequest );
605529 currDocCmd .solrDoc = solrDoc ;
606530 processor .processAdd (currDocCmd );
607531 numDocsProcessed ++;
608- numDocsAccum ++;
609532 }
610533 } catch (Exception e ) {
611534 log .error ("Error in CvReindexingTask process() : {}" , e .toString ());
@@ -631,10 +554,7 @@ private boolean processSegment(
631554 }
632555 searcherRef .decref ();
633556 }
634- if (!segmentError && solrDoc != null ) {
635- setLastDoc (new SolrInputDocument (solrDoc ));
636- getLastDoc ().removeField ("_version_" );
637- }
557+
638558 log .info (
639559 "End processing segment : {}, core: {} docs processed: {}" ,
640560 segmentReader .getSegmentName (),
@@ -644,10 +564,6 @@ private boolean processSegment(
644564 return segmentError ;
645565 }
646566
647- public SolrInputDocument getLastDoc () {
648- return lastDoc ;
649- }
650-
651567 /*
652568 * Convert a lucene Document to a SolrInputDocument
653569 */
0 commit comments