Skip to content

Commit c33cb78

Browse files
committed
feat(timings): derive concurrency dat from unit data
Added a post-processing that derives concurrency information (active/waiting/inactive units over time) from completed unit timing data. This enables eliminating runtime concurrency tracking.
1 parent 40179eb commit c33cb78

File tree

3 files changed

+129
-27
lines changed

3 files changed

+129
-27
lines changed

src/cargo/core/compiler/job_queue/mod.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -887,11 +887,6 @@ impl<'gctx> DrainState<'gctx> {
887887
// Record some timing information if `--timings` is enabled, and
888888
// this'll end up being a noop if we're not recording this
889889
// information.
890-
self.timings.mark_concurrency(
891-
self.active.len(),
892-
self.pending_queue.len(),
893-
self.queue.len(),
894-
);
895890
self.timings.record_cpu();
896891

897892
let active_names = self

src/cargo/core/compiler/timings/mod.rs

Lines changed: 2 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,6 @@ pub struct Timings<'gctx> {
6262
/// Units that are in the process of being built.
6363
/// When they finished, they are moved to `unit_times`.
6464
active: HashMap<JobId, UnitTime>,
65-
/// Concurrency-tracking information. This is periodically updated while
66-
/// compilation progresses.
67-
concurrency: Vec<Concurrency>,
6865
/// Last recorded state of the system's CPUs and when it happened
6966
last_cpu_state: Option<State>,
7067
last_cpu_recording: Instant,
@@ -161,7 +158,6 @@ impl<'gctx> Timings<'gctx> {
161158
unit_to_index: HashMap::new(),
162159
unit_times: Vec::new(),
163160
active: HashMap::new(),
164-
concurrency: Vec::new(),
165161
last_cpu_state: None,
166162
last_cpu_recording: Instant::now(),
167163
cpu_usage: Vec::new(),
@@ -214,7 +210,6 @@ impl<'gctx> Timings<'gctx> {
214210
unit_to_index,
215211
unit_times: Vec::new(),
216212
active: HashMap::new(),
217-
concurrency: Vec::new(),
218213
last_cpu_state,
219214
last_cpu_recording: Instant::now(),
220215
cpu_usage: Vec::new(),
@@ -384,20 +379,6 @@ impl<'gctx> Timings<'gctx> {
384379
}
385380
}
386381

387-
/// This is called periodically to mark the concurrency of internal structures.
388-
pub fn mark_concurrency(&mut self, active: usize, waiting: usize, inactive: usize) {
389-
if !self.enabled {
390-
return;
391-
}
392-
let c = Concurrency {
393-
t: self.start.elapsed().as_secs_f64(),
394-
active,
395-
waiting,
396-
inactive,
397-
};
398-
self.concurrency.push(c);
399-
}
400-
401382
/// Mark that a fresh unit was encountered. (No re-compile needed)
402383
pub fn add_fresh(&mut self) {
403384
self.total_fresh += 1;
@@ -444,7 +425,6 @@ impl<'gctx> Timings<'gctx> {
444425
if !self.enabled {
445426
return Ok(());
446427
}
447-
self.mark_concurrency(0, 0, 0);
448428
self.unit_times
449429
.sort_unstable_by(|a, b| a.start.partial_cmp(&b.start).unwrap());
450430
if self.report_html {
@@ -473,6 +453,7 @@ impl<'gctx> Timings<'gctx> {
473453
.collect::<Vec<_>>();
474454

475455
let unit_data = report::to_unit_data(&self.unit_times, &self.unit_to_index);
456+
let concurrency = report::compute_concurrency(&unit_data);
476457

477458
let ctx = report::RenderContext {
478459
start: self.start,
@@ -482,7 +463,7 @@ impl<'gctx> Timings<'gctx> {
482463
total_fresh: self.total_fresh,
483464
total_dirty: self.total_dirty,
484465
unit_data,
485-
concurrency: &self.concurrency,
466+
concurrency,
486467
cpu_usage: &self.cpu_usage,
487468
rustc_version,
488469
host: &build_runner.bcx.rustc().host,

src/cargo/core/compiler/timings/report.rs

Lines changed: 127 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
33
use std::borrow::Cow;
44
use std::collections::HashMap;
5+
use std::collections::HashSet;
56
use std::io::Write;
67
use std::time::Instant;
78

@@ -100,7 +101,7 @@ pub struct RenderContext<'a> {
100101
pub unit_data: Vec<UnitData>,
101102
/// Concurrency-tracking information. This is periodically updated while
102103
/// compilation progresses.
103-
pub concurrency: &'a [Concurrency],
104+
pub concurrency: Vec<Concurrency>,
104105
/// Recorded CPU states, stored as tuples. First element is when the
105106
/// recording was taken and second element is percentage usage of the
106107
/// system.
@@ -391,6 +392,131 @@ pub(super) fn to_unit_data(
391392
.collect()
392393
}
393394

395+
/// Derives concurrency information from unit timing data.
396+
pub(super) fn compute_concurrency(unit_data: &[UnitData]) -> Vec<Concurrency> {
397+
if unit_data.is_empty() {
398+
return Vec::new();
399+
}
400+
401+
let unit_by_index: HashMap<_, _> = unit_data.iter().map(|u| (u.i, u)).collect();
402+
403+
enum UnblockedBy {
404+
Rmeta(u64),
405+
Full(u64),
406+
}
407+
408+
// unit_id -> unit that unblocks it.
409+
let mut unblocked_by: HashMap<_, _> = HashMap::new();
410+
for unit in unit_data {
411+
for id in unit.unblocked_rmeta_units.iter() {
412+
assert!(
413+
unblocked_by
414+
.insert(*id, UnblockedBy::Rmeta(unit.i))
415+
.is_none()
416+
);
417+
}
418+
419+
for id in unit.unblocked_units.iter() {
420+
assert!(
421+
unblocked_by
422+
.insert(*id, UnblockedBy::Full(unit.i))
423+
.is_none()
424+
);
425+
}
426+
}
427+
428+
let ready_time = |unit: &UnitData| -> Option<f64> {
429+
let dep = unblocked_by.get(&unit.i)?;
430+
match dep {
431+
UnblockedBy::Rmeta(id) => {
432+
let dep = unit_by_index.get(id)?;
433+
let duration = dep.sections.iter().flatten().find_map(|(name, section)| {
434+
matches!(name, SectionName::Frontend).then_some(section.end)
435+
});
436+
437+
Some(dep.start + duration.unwrap_or(dep.duration))
438+
}
439+
UnblockedBy::Full(id) => {
440+
let dep = unit_by_index.get(id)?;
441+
Some(dep.start + dep.duration)
442+
}
443+
}
444+
};
445+
446+
#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)]
447+
enum State {
448+
Ready,
449+
Start,
450+
End,
451+
}
452+
453+
let mut events: Vec<_> = unit_data
454+
.iter()
455+
.flat_map(|unit| {
456+
// Adding rounded numbers may cause ready > start,
457+
// so cap with unit.start here to be defensive.
458+
let ready = ready_time(unit).unwrap_or(unit.start).min(unit.start);
459+
460+
[
461+
(ready, State::Ready, unit.i),
462+
(unit.start, State::Start, unit.i),
463+
(unit.start + unit.duration, State::End, unit.i),
464+
]
465+
})
466+
.collect();
467+
468+
events.sort_by(|a, b| {
469+
a.0.partial_cmp(&b.0)
470+
.unwrap()
471+
.then_with(|| a.1.cmp(&b.1))
472+
.then_with(|| a.2.cmp(&b.2))
473+
});
474+
475+
let mut concurrency: Vec<Concurrency> = Vec::new();
476+
let mut inactive: HashSet<u64> = unit_data.iter().map(|unit| unit.i).collect();
477+
let mut waiting: HashSet<u64> = HashSet::new();
478+
let mut active: HashSet<u64> = HashSet::new();
479+
480+
for (t, state, unit_id) in events {
481+
match state {
482+
State::Ready => {
483+
inactive.remove(&unit_id);
484+
waiting.insert(unit_id);
485+
active.remove(&unit_id);
486+
}
487+
State::Start => {
488+
inactive.remove(&unit_id);
489+
waiting.remove(&unit_id);
490+
active.insert(unit_id);
491+
}
492+
State::End => {
493+
inactive.remove(&unit_id);
494+
waiting.remove(&unit_id);
495+
active.remove(&unit_id);
496+
}
497+
}
498+
499+
let record = Concurrency {
500+
t,
501+
active: active.len(),
502+
waiting: waiting.len(),
503+
inactive: inactive.len(),
504+
};
505+
506+
if let Some(last) = concurrency.last_mut()
507+
&& last.t == t
508+
{
509+
// We don't want to draw long vertical lines at the same timestamp,
510+
// so we keep only the latest state.
511+
*last = record;
512+
} else {
513+
concurrency.push(record);
514+
}
515+
}
516+
517+
concurrency
518+
}
519+
394520
/// Aggregates section timing information from individual compilation sections.
395521
///
396522
/// We can have a bunch of situations here.

0 commit comments

Comments
 (0)