|
31 | 31 | import java.util.concurrent.Future; |
32 | 32 | import java.util.concurrent.ScheduledExecutorService; |
33 | 33 | import java.util.concurrent.TimeUnit; |
| 34 | +import java.util.concurrent.atomic.AtomicBoolean; |
34 | 35 |
|
35 | 36 | import com.cloud.network.Network; |
36 | 37 | import com.cloud.usage.dao.UsageNetworksDao; |
@@ -192,6 +193,7 @@ public class UsageManagerImpl extends ManagerBase implements UsageManager, Runna |
192 | 193 | private final List<UsageVmDiskVO> usageVmDisks = new ArrayList<UsageVmDiskVO>(); |
193 | 194 |
|
194 | 195 | private final ScheduledExecutorService _executor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("Usage-Job")); |
| 196 | + private final AtomicBoolean isParsingJobRunning = new AtomicBoolean(false); |
195 | 197 | private final ScheduledExecutorService _heartbeatExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("Usage-HB")); |
196 | 198 | private final ScheduledExecutorService _sanityExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("Usage-Sanity")); |
197 | 199 | private Future _scheduledFuture = null; |
@@ -367,7 +369,12 @@ public void run() { |
367 | 369 | (new ManagedContextRunnable() { |
368 | 370 | @Override |
369 | 371 | protected void runInContext() { |
370 | | - runInContextInternal(); |
| 372 | + isParsingJobRunning.set(true); |
| 373 | + try { |
| 374 | + runInContextInternal(); |
| 375 | + } finally { |
| 376 | + isParsingJobRunning.set(false); |
| 377 | + } |
371 | 378 | } |
372 | 379 | }).run(); |
373 | 380 | } |
@@ -2267,9 +2274,14 @@ protected void runInContext() { |
2267 | 2274 |
|
2268 | 2275 | if ((timeSinceLastSuccessJob > 0) && (timeSinceLastSuccessJob > (aggregationDurationMillis - 100))) { |
2269 | 2276 | if (timeToJob > (aggregationDurationMillis / 2)) { |
2270 | | - logger.debug("it's been {} ms since last usage job and {} ms until next job, scheduling an immediate job to catch up (aggregation duration is {} minutes)" |
2271 | | - , timeSinceLastSuccessJob, timeToJob, _aggregationDuration); |
2272 | | - scheduleParse(); |
| 2277 | + logger.debug("Heartbeat: it's been {} ms since last finished usage job and {} ms until next job (aggregation duration is {} minutes)", |
| 2278 | + timeSinceLastSuccessJob, timeToJob, _aggregationDuration); |
| 2279 | + if (isParsingJobRunning.get()) { |
| 2280 | + logger.debug("Heartbeat: A parsing job is already running"); |
| 2281 | + } else { |
| 2282 | + logger.debug("Heartbeat: Scheduling an immediate job to catch up"); |
| 2283 | + scheduleParse(); |
| 2284 | + } |
2273 | 2285 | } |
2274 | 2286 | } |
2275 | 2287 |
|
|
0 commit comments