@@ -78,6 +78,14 @@ public class MemoryEstimatorOracle implements OutOfCoreOracle {
7878 new FloatConfOption ("giraph.garbageEstimator.manualGCPressure" , 0.95f ,
7979 "The threshold above which GC is called manually if Full GC has not " +
8080 "happened in a while" );
81+ /**
82+ * If full GC is not called within this interval, in case of tight memory
83+ * pressure, we should call GC manually
84+ */
85+ public static final LongConfOption MANUAL_GC_INTERVAL =
86+ new LongConfOption ("giraph.memoryEstimator.manualGCInterval" , 20000 ,
87+ "The amount of time passed from the last full GC to call GC " +
88+ "manually if memory is tight (in milliseconds)" );
8189 /** Used to detect a high memory pressure situation */
8290 public static final FloatConfOption GC_MINIMUM_RECLAIM_FRACTION =
8391 new FloatConfOption ("giraph.garbageEstimator.gcReclaimFraction" , 0.05f ,
@@ -140,6 +148,11 @@ public class MemoryEstimatorOracle implements OutOfCoreOracle {
140148 private volatile State state = State .STABLE ;
141149 /** Timestamp of the last major GC */
142150 private volatile long lastMajorGCTime = 0 ;
151+ /**
152+ * Lock to avoid interleaving of resetting data structures in memory estimator
153+ * with modifying those data structures
154+ */
155+ private Lock lock = new ReentrantLock ();
143156
144157 /**
145158 * Different states the OOC can be in.
@@ -172,6 +185,7 @@ public MemoryEstimatorOracle(ImmutableClassesGiraphConfiguration conf,
172185 this .oocThreshold = OOC_THRESHOLD .get (conf );
173186
174187 final long checkMemoryInterval = CHECK_MEMORY_INTERVAL .get (conf );
188+ final long manualGCInterval = MANUAL_GC_INTERVAL .get (conf );
175189
176190 ThreadUtils .startThread (new Runnable () {
177191 @ Override
@@ -183,9 +197,9 @@ public void run() {
183197 updateRates (oldGenUsageEstimate , usage .getMax ());
184198 } else {
185199 long time = System .currentTimeMillis ();
186- if (time - lastMajorGCTime >= 10000 ) {
200+ if (time - lastMajorGCTime >= manualGCInterval ) {
187201 double used = (double ) usage .getUsed () / usage .getMax ();
188- if (used > manualGCMemoryPressure ) {
202+ if (used > manualGCMemoryPressure || state == State . OFFLOADING ) {
189203 if (LOG .isInfoEnabled ()) {
190204 LOG .info (
191205 "High memory pressure with no full GC from the JVM. " +
@@ -231,8 +245,13 @@ public void run() {
231245 public void startIteration () {
232246 AbstractEdgeStore .PROGRESS_COUNTER .reset ();
233247 oocBytesInjected .set (0 );
234- memoryEstimator .clear ();
235- memoryEstimator .setCurrentSuperstep (oocEngine .getSuperstep ());
248+ lock .lock ();
249+ try {
250+ memoryEstimator .clear ();
251+ memoryEstimator .setCurrentSuperstep (oocEngine .getSuperstep ());
252+ } finally {
253+ lock .unlock ();
254+ }
236255 oocEngine .updateRequestsCreditFraction (1 );
237256 oocEngine .updateActiveThreadsFraction (1 );
238257 }
@@ -331,30 +350,36 @@ public synchronized void gcCompleted(
331350 }
332351 }
333352
334- // Number of edges loaded so far (if in input superstep)
335- long edgesLoaded = oocEngine .getSuperstep () >= 0 ? 0 :
336- EdgeInputSplitsCallable .getTotalEdgesLoadedMeter ().count ();
337- // Number of vertices loaded so far (if in input superstep)
338- long verticesLoaded = oocEngine .getSuperstep () >= 0 ? 0 :
339- VertexInputSplitsCallable .getTotalVerticesLoadedMeter ().count ();
340- // Number of vertices computed (if either in compute or store phase)
341- long verticesComputed = WorkerProgress .get ().getVerticesComputed () +
342- WorkerProgress .get ().getVerticesStored () +
343- AbstractEdgeStore .PROGRESS_COUNTER .getProgress ();
344- // Number of bytes received
345- long receivedBytes =
346- oocEngine .getNetworkMetrics ().getBytesReceivedPerSuperstep ();
347- // Number of OOC bytes
348- long oocBytes = oocBytesInjected .get ();
349-
350- memoryEstimator .addRecord (getOldGenUsed ().getUsed (), edgesLoaded ,
351- verticesLoaded , verticesComputed , receivedBytes , oocBytes );
353+ lock .lock ();
354+ try {
355+ // Number of edges loaded so far (if in input superstep)
356+ long edgesLoaded = memoryEstimator .getCurrentSuperstep () >= 0 ? 0 :
357+ EdgeInputSplitsCallable .getTotalEdgesLoadedMeter ().count ();
358+ // Number of vertices loaded so far (if in input superstep)
359+ long verticesLoaded = memoryEstimator .getCurrentSuperstep () >= 0 ? 0 :
360+ VertexInputSplitsCallable .getTotalVerticesLoadedMeter ().count ();
361+ // Number of vertices computed (if either in compute or store phase)
362+ long verticesComputed = WorkerProgress .get ().getVerticesComputed () +
363+ WorkerProgress .get ().getVerticesStored () +
364+ AbstractEdgeStore .PROGRESS_COUNTER .getProgress ();
365+ // Number of bytes received
366+ long receivedBytes =
367+ oocEngine .getNetworkMetrics ().getBytesReceivedPerSuperstep ();
368+ // Number of OOC bytes
369+ long oocBytes = oocBytesInjected .get ();
370+
371+ memoryEstimator .addRecord (getOldGenUsed ().getUsed (), edgesLoaded ,
372+ verticesLoaded , verticesComputed , receivedBytes , oocBytes );
373+ } finally {
374+ lock .unlock ();
375+ }
352376
353377 long garbage = before .getUsed () - after .getUsed ();
354378 long maxMem = after .getMax ();
355379 long memUsed = after .getUsed ();
356- boolean isTight = (maxMem - memUsed ) < 2 * gcReclaimFraction * maxMem &&
357- garbage < gcReclaimFraction * maxMem ;
380+ boolean highMemoryUsage =
381+ (maxMem - memUsed ) < 2 * gcReclaimFraction * maxMem ;
382+ boolean isTight = highMemoryUsage && garbage < gcReclaimFraction * maxMem ;
358383 boolean predictionExist = memoryEstimator .getUsageEstimate () > 0 ;
359384 if (isTight && !predictionExist ) {
360385 if (LOG .isInfoEnabled ()) {
@@ -369,6 +394,8 @@ public synchronized void gcCompleted(
369394 }
370395 state = State .OFFLOADING ;
371396 updateRates (1 , 1 );
397+ } else if (state == State .OFFLOADING && !highMemoryUsage ) {
398+ state = State .STABLE ;
372399 }
373400 }
374401 }
@@ -483,6 +510,10 @@ public void setCurrentSuperstep(long superstep) {
483510 this .currentSuperstep = superstep ;
484511 }
485512
513+ public long getCurrentSuperstep () {
514+ return this .currentSuperstep ;
515+ }
516+
486517 /**
487518 * Given the current state of computation (i.e. current edges loaded,
488519 * vertices computed etc) and the current model (i.e. the regression
@@ -787,14 +818,6 @@ private Boolean refineCoefficient(int coefIndex, double lowerBound,
787818 */
788819 private static boolean calculateRegression (double [] coefficient ,
789820 List <Integer > validColumnIndices , OLSMultipleLinearRegression mlr ) {
790-
791- if (coefficient .length != validColumnIndices .size ()) {
792- LOG .warn ("There are " + coefficient .length +
793- " coefficients, but " + validColumnIndices .size () +
794- " valid columns in the regression" );
795- return false ;
796- }
797-
798821 double [] beta = mlr .estimateRegressionParameters ();
799822 Arrays .fill (coefficient , 0 );
800823 for (int i = 0 ; i < validColumnIndices .size (); ++i ) {
0 commit comments