diff --git a/crates/tako/src/internal/server/core.rs b/crates/tako/src/internal/server/core.rs index 76431fb8a..e9ffe19ff 100644 --- a/crates/tako/src/internal/server/core.rs +++ b/crates/tako/src/internal/server/core.rs @@ -124,11 +124,6 @@ impl Core { (&mut self.tasks, &mut self.data_objects) } - #[cfg(test)] - pub fn get_resource_map_mut(&mut self) -> &mut GlobalResourceMapping { - &mut self.resource_map - } - pub fn new_worker_id(&mut self) -> WorkerId { self.worker_id_counter += 1; WorkerId::new(self.worker_id_counter) @@ -619,7 +614,9 @@ mod tests { use crate::internal::server::task::TaskRuntimeState; use crate::internal::server::worker::Worker; use crate::internal::server::workergroup::WorkerGroup; - use crate::internal::tests::utils::task; + + use crate::tests::utils::env::TestEnv; + use crate::{TaskId, WorkerId}; impl Core { @@ -700,15 +697,13 @@ mod tests { #[test] fn add_remove() { - let mut core = Core::default(); - let rmap = core.get_resource_map_mut(); - let t = task::task(101, rmap); - core.add_task(t); + let mut rt = TestEnv::new(); + rt.new_task_default(101); let mut objs_to_remove = ObjsToRemoveFromWorkers::new(); assert!(matches!( - core.remove_task(101.into(), &mut objs_to_remove), + rt.core().remove_task(101.into(), &mut objs_to_remove), TaskRuntimeState::Waiting(_) )); - assert_eq!(core.find_task(101.into()), None); + assert_eq!(rt.core().find_task(101.into()), None); } } diff --git a/crates/tako/src/internal/server/explain.rs b/crates/tako/src/internal/server/explain.rs index d28027c1a..847460ff6 100644 --- a/crates/tako/src/internal/server/explain.rs +++ b/crates/tako/src/internal/server/explain.rs @@ -144,79 +144,72 @@ pub fn task_explain_for_worker( #[cfg(test)] mod tests { - use crate::internal::common::resources::map::GlobalResourceMapping; + use crate::internal::server::explain::{TaskExplainItem, task_explain_for_worker}; - use crate::internal::server::worker::Worker; + use crate::internal::server::workergroup::WorkerGroup; - use crate::internal::tests::utils::schedule::create_test_worker_config; use crate::internal::tests::utils::task::TaskBuilder; - use crate::resources::{ - ResourceAmount, ResourceDescriptor, ResourceDescriptorItem, ResourceIdMap, - }; - use crate::{Set, WorkerId}; + + use crate::resources::ResourceAmount; + use crate::tests::utils::env::TestEnv; + use crate::tests::utils::worker::WorkerBuilder; + use crate::{Set, TaskId, WorkerId}; use std::time::{Duration, Instant}; #[test] fn explain_single_node() { - let mut rqs = GlobalResourceMapping::default(); - let resource_map = ResourceIdMap::from_vec(vec!["cpus".to_string(), "gpus".to_string()]); - let now = Instant::now(); - - let wcfg = create_test_worker_config(1.into(), ResourceDescriptor::simple_cpus(4)); - let worker1 = Worker::new(1.into(), wcfg, &resource_map, now); - - let mut wcfg = create_test_worker_config( - 2.into(), - ResourceDescriptor::new( - vec![ - ResourceDescriptorItem::range("cpus", 1, 10), - ResourceDescriptorItem::range("gpus", 1, 4), - ], - Default::default(), - ), + let mut rt = TestEnv::new(); + rt.new_named_resource("gpus"); + rt.new_worker_with_id(1, &WorkerBuilder::new(4)); + rt.new_worker_with_id( + 2, + &WorkerBuilder::empty() + .res_range("cpus", 1, 10) + .res_range("gpus", 1, 4) + .time_limit(Duration::from_secs(40_000)), ); - wcfg.time_limit = Some(Duration::from_secs(40_000)); - let worker2 = Worker::new(2.into(), wcfg, &resource_map, now); - let explain = |task, rqs: &GlobalResourceMapping, worker, now| { + let resource_map = rt.core().create_resource_map(); + let explain = |rt: &mut TestEnv, task, worker, now| { let group = WorkerGroup::new(Set::new()); + let (task_map, worker_map, rqs) = rt.core().split_tasks_workers_requests_mut(); task_explain_for_worker( &resource_map, - rqs.get_resource_rq_map(), - task, - worker, + rqs, + task_map.get_task(TaskId::new_test(task)), + worker_map.get_worker(WorkerId::new(worker)), &group, now, ) }; - let task_id = 1; - let task = TaskBuilder::new(task_id).build(&mut rqs); - let r = explain(&task, &rqs, &worker1, now); + let _rqs = rt.core().resource_map_mut(); + let now = Instant::now(); + rt.new_task(1, &TaskBuilder::new()); + let (_task_map, _worker_map, _rqs) = rt.core().split_tasks_workers_requests_mut(); + let r = explain(&mut rt, 1, 1, now); assert_eq!(r.variants.len(), 1); assert_eq!(r.variants[0].len(), 1); assert_eq!(r.n_enabled_variants(), 1); - let task = TaskBuilder::new(task_id) - .time_request(20_000) - .build(&mut rqs); - let r = explain(&task, &rqs, &worker1, now); + rt.new_task(2, &TaskBuilder::new().time_request(20_000)); + let r = explain(&mut rt, 2, 1, now); assert_eq!(r.variants.len(), 1); assert_eq!(r.variants[0].len(), 2); assert_eq!(r.n_enabled_variants(), 1); - let r = explain(&task, &rqs, &worker2, now); + let r = explain(&mut rt, 2, 2, now); assert_eq!(r.variants.len(), 1); assert_eq!(r.variants[0].len(), 2); assert_eq!(r.n_enabled_variants(), 1); let now2 = now + Duration::from_secs(21_000); - let r = explain(&task, &rqs, &worker1, now2); + let r = explain(&mut rt, 2, 1, now2); assert_eq!(r.variants.len(), 1); assert_eq!(r.variants[0].len(), 2); assert_eq!(r.n_enabled_variants(), 1); - let r = explain(&task, &rqs, &worker2, now2); + let r = explain(&mut rt, 2, 2, now2); assert_eq!(r.variants.len(), 1); assert_eq!(r.variants[0].len(), 2); assert!(matches!( @@ -224,16 +217,18 @@ mod tests { TaskExplainItem::Time { min_time, remaining_time, - } if min_time == Duration::from_secs(20_000) && remaining_time == Some(Duration::from_secs(19_000)) + } if min_time == Duration::from_secs(20_000) && (remaining_time.unwrap().as_secs().abs_diff(19_000) < 3) )); assert_eq!(r.n_enabled_variants(), 0); - let task = TaskBuilder::new(task_id) - .time_request(20_000) - .cpus_compact(30) - .add_resource(1, 3) - .build(&mut rqs); - let r = explain(&task, &rqs, &worker2, now); + rt.new_task( + 3, + &TaskBuilder::new() + .time_request(20_000) + .cpus(30) + .add_resource(1, 3), + ); + let r = explain(&mut rt, 3, 2, now); assert_eq!(r.variants.len(), 1); assert_eq!(r.variants[0].len(), 3); assert!(matches!( @@ -244,25 +239,27 @@ mod tests { )); assert_eq!(r.n_enabled_variants(), 0); - let task = TaskBuilder::new(task_id) - .time_request(30_000) - .cpus_compact(15) - .add_resource(1, 8) - .next_resources() - .cpus_compact(2) - .add_resource(1, 32) - .build(&mut rqs); - let r = explain(&task, &rqs, &worker2, now2); + rt.new_task( + 4, + &TaskBuilder::new() + .time_request(30_000) + .cpus(15) + .add_resource(1, 8) + .next_resources() + .cpus(2) + .add_resource(1, 32), + ); + let r = explain(&mut rt, 4, 2, now2); assert_eq!(r.variants.len(), 2); assert_eq!(r.variants[0].len(), 3); assert_eq!(r.variants[1].len(), 2); + assert!(matches!( r.variants[0][0], TaskExplainItem::Time { min_time, remaining_time, - } if min_time == Duration::from_secs(30_000) && remaining_time == Some(Duration::from_secs(19_000)) - )); + } if min_time == Duration::from_secs(30_000) && (remaining_time.unwrap().as_secs().abs_diff(19_000) < 3))); assert!(matches!( &r.variants[0][1], TaskExplainItem::Resources { @@ -285,24 +282,25 @@ mod tests { #[test] fn explain_multi_node() { - let mut rqs = GlobalResourceMapping::default(); - let resource_map = ResourceIdMap::from_vec(vec!["cpus".to_string(), "gpus".to_string()]); let now = Instant::now(); + let mut rt = TestEnv::new(); + + rt.new_worker_with_id(1, &WorkerBuilder::new(4)); - let wcfg = create_test_worker_config(1.into(), ResourceDescriptor::simple_cpus(4)); - let worker = Worker::new(1.into(), wcfg, &resource_map, now); - let task = TaskBuilder::new(1).n_nodes(4).build(&mut rqs); + let _task = rt.new_task(1, &TaskBuilder::new().n_nodes(4)); let mut wset = Set::new(); wset.insert(WorkerId::new(1)); wset.insert(WorkerId::new(2)); wset.insert(WorkerId::new(3)); wset.insert(WorkerId::new(132)); let group = WorkerGroup::new(wset); + let resource_map = rt.core().create_resource_map(); + let (task_map, worker_map, rqs) = rt.core().split_tasks_workers_requests_mut(); let r = task_explain_for_worker( &resource_map, - rqs.get_resource_rq_map(), - &task, - &worker, + rqs, + task_map.get_task(TaskId::new_test(1)), + worker_map.get_worker(WorkerId::new(1)), &group, now, ); @@ -316,9 +314,9 @@ mod tests { let group = WorkerGroup::new(wset); let r = task_explain_for_worker( &resource_map, - rqs.get_resource_rq_map(), - &task, - &worker, + rqs, + task_map.get_task(TaskId::new_test(1)), + worker_map.get_worker(WorkerId::new(1)), &group, now, ); diff --git a/crates/tako/src/internal/server/task.rs b/crates/tako/src/internal/server/task.rs index 1ea45d0cf..8ecc50bea 100644 --- a/crates/tako/src/internal/server/task.rs +++ b/crates/tako/src/internal/server/task.rs @@ -502,12 +502,11 @@ fn estimate_shared_data_size(data: &ComputeTaskSharedData) -> usize { #[cfg(test)] mod tests { - use crate::internal::common::resources::map::GlobalResourceMapping; - use crate::internal::server::core::Core; + use crate::internal::server::task::{Task, TaskRuntimeState}; - use crate::internal::tests::utils::schedule::submit_test_tasks; - use crate::internal::tests::utils::task; - use crate::internal::tests::utils::task::task_with_deps; + + use crate::tests::utils::env::TestEnv; + use crate::tests::utils::task::TaskBuilder; use std::default::Default; impl Task { @@ -521,29 +520,27 @@ mod tests { #[test] fn task_consumers_empty() { - let mut rmap = GlobalResourceMapping::default(); - let a = task::task(0, &mut rmap); + let mut rt = TestEnv::new(); + let a = rt.new_task_default(0); let mut s = crate::Set::new(); - a.collect_recursive_consumers(&Default::default(), &mut s); + rt.task(a) + .collect_recursive_consumers(&Default::default(), &mut s); assert!(s.is_empty()); } #[test] fn task_recursive_consumers() { - let mut core = Core::default(); - let rmap = core.get_resource_map_mut(); - let a = task::task(0, rmap); - let b = task_with_deps(1, &[&a], rmap); - let c = task_with_deps(2, &[&b], rmap); - let d = task_with_deps(3, &[&b], rmap); - let e = task_with_deps(4, &[&c, &d], rmap); - - let expected_ids = vec![b.id, c.id, d.id, e.id]; - submit_test_tasks(&mut core, vec![a, b, c, d, e]); - + let mut rt = TestEnv::new(); + let a = rt.new_task_default(0); + let b = rt.new_task(1, &TaskBuilder::new().task_deps(&[a])); + let c = rt.new_task(2, &TaskBuilder::new().task_deps(&[b])); + let d = rt.new_task(3, &TaskBuilder::new().task_deps(&[b])); + let e = rt.new_task(4, &TaskBuilder::new().task_deps(&[c, d])); + + let expected_ids = vec![b, c, d, e]; let mut s = crate::Set::new(); - core.get_task(0.into()) - .collect_recursive_consumers(core.task_map(), &mut s); + let tasks = rt.task_map(); + rt.task(0).collect_recursive_consumers(tasks, &mut s); assert_eq!(s, expected_ids.into_iter().collect()); } } diff --git a/crates/tako/src/internal/tests/test_query.rs b/crates/tako/src/internal/tests/test_query.rs index a1e31a2d3..1f988cfb4 100644 --- a/crates/tako/src/internal/tests/test_query.rs +++ b/crates/tako/src/internal/tests/test_query.rs @@ -4,12 +4,10 @@ use crate::internal::scheduler::query::compute_new_worker_query; use crate::internal::server::core::Core; use crate::internal::server::reactor::on_cancel_tasks; use crate::internal::tests::utils::env::{TestEnv, create_test_comm}; -use crate::internal::tests::utils::schedule::{ - create_test_scheduler, create_test_workers, submit_test_tasks, -}; +use crate::internal::tests::utils::schedule::create_test_scheduler; use crate::internal::tests::utils::task::TaskBuilder; use crate::resources::{ResourceDescriptor, ResourceDescriptorItem, ResourceDescriptorKind}; -use crate::tests::utils::schedule::create_test_worker; +use crate::tests::utils::worker::WorkerBuilder; use std::time::Duration; #[test] @@ -32,22 +30,19 @@ fn test_query_no_tasks() { #[test] fn test_query_enough_workers() { - let mut core = Core::default(); - - create_test_workers(&mut core, &[2, 3]); + let mut rt = TestEnv::new(); + rt.new_workers_cpus(&[2, 3]); - let rmap = core.get_resource_map_mut(); - let t1 = TaskBuilder::new(1).cpus_compact(3).build(rmap); - let t2 = TaskBuilder::new(2).cpus_compact(1).build(rmap); - let t3 = TaskBuilder::new(3).cpus_compact(1).build(rmap); - submit_test_tasks(&mut core, vec![t1, t2, t3]); + rt.new_task_cpus(1, 3); + rt.new_task_cpus(2, 1); + rt.new_task_cpus(3, 1); let mut scheduler = create_test_scheduler(); let mut comm = create_test_comm(); - scheduler.run_scheduling(&mut core, &mut comm); + scheduler.run_scheduling(rt.core(), &mut comm); let r = compute_new_worker_query( - &mut core, + rt.core(), &[WorkerTypeQuery { partial: false, descriptor: ResourceDescriptor::simple_cpus(4), @@ -63,22 +58,19 @@ fn test_query_enough_workers() { #[test] fn test_query_no_enough_workers1() { - let mut core = Core::default(); - - create_test_workers(&mut core, &[2, 3]); + let mut rt = TestEnv::new(); + rt.new_workers_cpus(&[2, 3]); - let rmap = core.get_resource_map_mut(); - let t1 = TaskBuilder::new(1).cpus_compact(3).build(rmap); - let t2 = TaskBuilder::new(2).cpus_compact(3).build(rmap); - let t3 = TaskBuilder::new(3).cpus_compact(1).build(rmap); - submit_test_tasks(&mut core, vec![t1, t2, t3]); + rt.new_task_cpus(1, 3); + rt.new_task_cpus(2, 3); + rt.new_task_cpus(3, 1); let mut scheduler = create_test_scheduler(); let mut comm = create_test_comm(); - scheduler.run_scheduling(&mut core, &mut comm); + scheduler.run_scheduling(rt.core(), &mut comm); let r = compute_new_worker_query( - &mut core, + rt.core(), &[ WorkerTypeQuery { partial: false, @@ -106,10 +98,10 @@ fn test_query_no_enough_workers1() { fn test_query_enough_workers2() { let mut rt = TestEnv::new(); - rt.new_workers(&[2]); + rt.new_workers_cpus(&[2]); - rt.new_task_running(TaskBuilder::new(10).cpus_compact(1), 100); - rt.new_task_assigned(TaskBuilder::new(11).cpus_compact(1), 100); + rt.new_task_running(10, &TaskBuilder::new(), 100); + rt.new_task_assigned(11, &TaskBuilder::new(), 100); rt.schedule(); let r = compute_new_worker_query( @@ -141,11 +133,12 @@ fn test_query_enough_workers2() { fn test_query_not_enough_workers3() { let mut rt = TestEnv::new(); - rt.new_workers(&[2]); + rt.new_workers_cpus(&[2]); - rt.new_task_running(TaskBuilder::new(10).cpus_compact(1), 100); - rt.new_task_running(TaskBuilder::new(12).cpus_compact(1), 100); - rt.new_task_assigned(TaskBuilder::new(11).cpus_compact(1), 100); + let t = TaskBuilder::new().cpus(1); + rt.new_task_running(10, &t, 100); + rt.new_task_running(12, &t, 100); + rt.new_task_assigned(11, &t, 100); rt.schedule(); let r = compute_new_worker_query( @@ -177,10 +170,10 @@ fn test_query_not_enough_workers3() { fn test_query_many_workers_needed() { let mut rt = TestEnv::new(); - rt.new_workers(&[4, 4, 4]); + rt.new_workers_cpus(&[4, 4, 4]); for i in 1..=100 { - rt.new_task(TaskBuilder::new(i).cpus_compact(1)); + rt.new_task_default(i); } rt.schedule(); @@ -221,21 +214,21 @@ fn test_query_many_workers_needed() { fn test_query_multi_node_tasks() { let mut rt = TestEnv::new(); - rt.new_workers(&[4, 4, 4]); + rt.new_workers_cpus(&[4, 4, 4]); for i in 0..5 { - rt.new_task(TaskBuilder::new(i).n_nodes(3)); + rt.new_task(i, &TaskBuilder::new().n_nodes(3)); } for i in 5..15 { - rt.new_task(TaskBuilder::new(i).n_nodes(6)); + rt.new_task(i, &TaskBuilder::new().n_nodes(6)); } for i in 15..20 { - rt.new_task(TaskBuilder::new(i).n_nodes(12)); + rt.new_task(i, &TaskBuilder::new().n_nodes(12)); } for i in 20..40 { - rt.new_task(TaskBuilder::new(i).n_nodes(3).user_priority(10)); + rt.new_task(i, &TaskBuilder::new().n_nodes(3).user_priority(10)); } - rt.new_task(TaskBuilder::new(1000).n_nodes(1)); + rt.new_task(1000, &TaskBuilder::new().n_nodes(1)); rt.schedule(); @@ -279,7 +272,7 @@ fn test_query_multi_node_tasks() { fn test_query_multi_node_time_limit() { let mut rt = TestEnv::new(); - rt.new_task(TaskBuilder::new(1).n_nodes(4).time_request(750)); + rt.new_task(1, &TaskBuilder::new().n_nodes(4).time_request(750)); rt.schedule(); for (secs, allocs) in [(740, 0), (760, 1)] { @@ -300,17 +293,14 @@ fn test_query_multi_node_time_limit() { #[test] fn test_query_min_utilization1() { - let mut core = Core::default(); - - let rmap = core.get_resource_map_mut(); - let t1 = TaskBuilder::new(1).cpus_compact(3).build(rmap); - let t2 = TaskBuilder::new(2).cpus_compact(1).build(rmap); - let t3 = TaskBuilder::new(3).cpus_compact(1).build(rmap); - submit_test_tasks(&mut core, vec![t1, t2, t3]); + let mut rt = TestEnv::new(); + rt.new_task_cpus(1, 3); + rt.new_task_cpus(2, 1); + rt.new_task_cpus(3, 1); let mut scheduler = create_test_scheduler(); let mut comm = create_test_comm(); - scheduler.run_scheduling(&mut core, &mut comm); + scheduler.run_scheduling(&mut rt.core(), &mut comm); for (min_utilization, alloc_value, cpus) in &[ (0.5, 0, 12), @@ -321,7 +311,7 @@ fn test_query_min_utilization1() { (0.7, 1, 3), ] { let r = compute_new_worker_query( - &mut core, + &mut rt.core(), &[WorkerTypeQuery { partial: false, descriptor: ResourceDescriptor::simple_cpus(*cpus), @@ -338,22 +328,14 @@ fn test_query_min_utilization1() { #[test] fn test_query_min_utilization2() { - let mut core = Core::default(); - - let rmap = core.get_resource_map_mut(); - let t1 = TaskBuilder::new(1) - .cpus_compact(1) - .add_resource(1, 10) - .build(rmap); - let t2 = TaskBuilder::new(2) - .cpus_compact(1) - .add_resource(1, 10) - .build(rmap); - submit_test_tasks(&mut core, vec![t1, t2]); + let mut rt = TestEnv::new(); + let t = TaskBuilder::new().cpus(1).add_resource(1, 10); + rt.new_task(1, &t); + rt.new_task(2, &t); let mut scheduler = create_test_scheduler(); let mut comm = create_test_comm(); - scheduler.run_scheduling(&mut core, &mut comm); + scheduler.run_scheduling(rt.core(), &mut comm); for (min_utilization, alloc_value, cpus, gpus) in &[ (0.5, 1, 12, 30), @@ -375,7 +357,7 @@ fn test_query_min_utilization2() { Default::default(), ); let r = compute_new_worker_query( - &mut core, + rt.core(), &[WorkerTypeQuery { partial: false, descriptor, @@ -392,12 +374,10 @@ fn test_query_min_utilization2() { #[test] fn test_query_min_utilization3() { - let mut core = Core::default(); - - let rmap = core.get_resource_map_mut(); - let t1 = TaskBuilder::new(1).cpus_compact(2).build(rmap); - let t2 = TaskBuilder::new(2).cpus_compact(2).build(rmap); - submit_test_tasks(&mut core, vec![t1, t2]); + let mut rt = TestEnv::new(); + let t = TaskBuilder::new().cpus(2); + rt.new_task(1, &t); + rt.new_task(2, &t); let descriptor = ResourceDescriptor::new( vec![ResourceDescriptorItem { @@ -407,7 +387,7 @@ fn test_query_min_utilization3() { Default::default(), ); let r = compute_new_worker_query( - &mut core, + rt.core(), &[WorkerTypeQuery { partial: false, descriptor, @@ -437,26 +417,17 @@ fn test_query_min_utilization_vs_partial() { (0, 3, 1), (0, 0, 0), ] { - let mut core = Core::default(); - let rmap = core.get_resource_map_mut(); - let tasks: Vec<_> = (1..=cpu_tasks) - .map(|task_id| TaskBuilder::new(task_id).cpus_compact(2).build(rmap)) - .collect(); - if !tasks.is_empty() { - submit_test_tasks(&mut core, tasks); - } - let rmap = core.get_resource_map_mut(); - let tasks: Vec<_> = (10..10 + gpu_tasks) - .map(|task_id| { - TaskBuilder::new(task_id) - .cpus_compact(2) - .add_resource(1, 1) - .build(rmap) - }) - .collect(); - if !tasks.is_empty() { - submit_test_tasks(&mut core, tasks); - } + let mut rt = TestEnv::new(); + let t = TaskBuilder::new().cpus(2); + (1..=cpu_tasks).for_each(|task_id| { + rt.new_task(task_id, &t); + }); + let t = TaskBuilder::new().cpus(2).add_resource(1, 1); + + (10..10 + gpu_tasks).for_each(|task_id| { + rt.new_task(task_id, &t); + }); + let descriptor = ResourceDescriptor::new( vec![ResourceDescriptorItem { name: "cpus".into(), @@ -465,7 +436,7 @@ fn test_query_min_utilization_vs_partial() { Default::default(), ); let r = compute_new_worker_query( - &mut core, + rt.core(), &[WorkerTypeQuery { partial: true, descriptor, @@ -482,20 +453,18 @@ fn test_query_min_utilization_vs_partial() { #[test] fn test_query_min_time2() { - let mut core = Core::default(); - let rmap = core.get_resource_map_mut(); - let t1 = TaskBuilder::new(1) - .cpus_compact(1) + let mut rt = TestEnv::new(); + let t1 = TaskBuilder::new() + .cpus(1) .time_request(100) .next_resources() - .cpus_compact(4) - .time_request(50) - .build(rmap); - submit_test_tasks(&mut core, vec![t1]); + .cpus(4) + .time_request(50); + rt.new_task(1, &t1); let mut scheduler = create_test_scheduler(); let mut comm = create_test_comm(); - scheduler.run_scheduling(&mut core, &mut comm); + scheduler.run_scheduling(rt.core(), &mut comm); for (cpus, secs, alloc) in [(2, 75, 0), (1, 100, 1), (4, 50, 1)] { let descriptor = ResourceDescriptor::new( @@ -506,7 +475,7 @@ fn test_query_min_time2() { Default::default(), ); let r = compute_new_worker_query( - &mut core, + rt.core(), &[WorkerTypeQuery { partial: false, descriptor: descriptor.clone(), @@ -523,21 +492,13 @@ fn test_query_min_time2() { #[test] fn test_query_min_time1() { - let mut core = Core::default(); - let rmap = core.get_resource_map_mut(); - let t1 = TaskBuilder::new(1) - .cpus_compact(1) - .time_request(100) - .build(rmap); - let t2 = TaskBuilder::new(2) - .cpus_compact(10) - .time_request(100) - .build(rmap); - submit_test_tasks(&mut core, vec![t1, t2]); + let mut rt = TestEnv::new(); + rt.new_task(1, &TaskBuilder::new().cpus(1).time_request(100)); + rt.new_task(2, &TaskBuilder::new().cpus(10).time_request(100)); let mut scheduler = create_test_scheduler(); let mut comm = create_test_comm(); - scheduler.run_scheduling(&mut core, &mut comm); + scheduler.run_scheduling(rt.core(), &mut comm); let descriptor = ResourceDescriptor::new( vec![ResourceDescriptorItem { @@ -547,7 +508,7 @@ fn test_query_min_time1() { Default::default(), ); let r = compute_new_worker_query( - &mut core, + rt.core(), &[WorkerTypeQuery { partial: false, descriptor: descriptor.clone(), @@ -561,7 +522,7 @@ fn test_query_min_time1() { assert!(r.multi_node_allocations.is_empty()); let r = compute_new_worker_query( - &mut core, + &mut rt.core(), &[WorkerTypeQuery { partial: false, descriptor: descriptor.clone(), @@ -582,7 +543,7 @@ fn test_query_min_time1() { Default::default(), ); let r = compute_new_worker_query( - &mut core, + rt.core(), &[WorkerTypeQuery { partial: false, descriptor, @@ -601,9 +562,9 @@ fn test_query_sn_leftovers1() { for (n, m) in [(1, 0), (4, 0), (8, 0), (9, 1), (12, 1)] { let mut rt = TestEnv::new(); - rt.new_workers(&[4]); + rt.new_workers_cpus(&[4]); for i in 1..=n { - rt.new_task(TaskBuilder::new(i).cpus_compact(1).time_request(5_000)); + rt.new_task(i, &TaskBuilder::new().cpus(1).time_request(5_000)); } rt.schedule(); @@ -637,7 +598,7 @@ fn test_query_sn_leftovers2() { for (cpus, out) in [(1, 0), (2, 3)] { let mut rt = TestEnv::new(); for i in 1..=100 { - rt.new_task(TaskBuilder::new(i).cpus_compact(2)); + rt.new_task_cpus(i, 2); } rt.schedule(); @@ -660,8 +621,8 @@ fn test_query_sn_leftovers2() { fn test_query_sn_leftovers() { let mut rt = TestEnv::new(); - rt.new_task(TaskBuilder::new(1).cpus_compact(4).time_request(750)); - rt.new_task(TaskBuilder::new(2).cpus_compact(8).time_request(1750)); + rt.new_task(1, &TaskBuilder::new().cpus(4).time_request(750)); + rt.new_task(2, &TaskBuilder::new().cpus(8).time_request(1750)); rt.schedule(); let r = compute_new_worker_query( @@ -700,9 +661,9 @@ fn test_query_sn_leftovers() { fn test_query_partial_query_cpus() { let mut rt = TestEnv::new(); - rt.new_task(TaskBuilder::new(1).cpus_compact(4)); + rt.new_task_cpus(1, 4); for i in 2..=5 { - rt.new_task(TaskBuilder::new(i).cpus_compact(8)); + rt.new_task_cpus(i, 8); } rt.schedule(); @@ -749,12 +710,12 @@ fn test_query_partial_query_gpus1() { (100, true, 1), ] { let mut rt = TestEnv::new(); + let mut builder = TaskBuilder::new().cpus(1).add_resource(1, 2); + if has_extra { + builder = builder.add_resource(2, 1); + } for i in 1..=10 { - let mut builder = TaskBuilder::new(i).cpus_compact(1).add_resource(1, 2); - if has_extra { - builder = builder.add_resource(2, 1); - } - rt.new_task(builder); + rt.new_task(i, &builder); } rt.schedule(); @@ -788,10 +749,10 @@ fn test_query_partial_query_gpus1() { #[test] fn test_query_unknown_do_not_add_extra() { let mut rt = TestEnv::new(); - rt.new_task(TaskBuilder::new(1).cpus_compact(1)); - rt.new_task(TaskBuilder::new(2).cpus_compact(1).add_resource(1, 1)); - rt.new_task(TaskBuilder::new(3).cpus_compact(1)); - rt.new_task(TaskBuilder::new(4).cpus_compact(1).add_resource(1, 1)); + rt.new_task_default(1); + rt.new_task(2, &TaskBuilder::new().cpus(1).add_resource(1, 1)); + rt.new_task_default(3); + rt.new_task(4, &TaskBuilder::new().cpus(1).add_resource(1, 1)); let r = compute_new_worker_query( rt.core(), @@ -810,10 +771,8 @@ fn test_query_unknown_do_not_add_extra() { #[test] fn test_query_after_task_cancel() { let mut rt = TestEnv::new(); - let rmap = rt.core().get_resource_map_mut(); - let t1 = TaskBuilder::new(1).cpus_compact(10).build(rmap); - submit_test_tasks(rt.core(), vec![t1]); - create_test_worker(rt.core(), 102.into(), 1); + rt.new_task_cpus(1, 10); + rt.new_worker_with_id(102, &WorkerBuilder::new(1)); rt.schedule(); let mut comm = create_test_comm(); on_cancel_tasks(rt.core(), &mut comm, &[TaskId::new_test(1)]); diff --git a/crates/tako/src/internal/tests/test_reactor.rs b/crates/tako/src/internal/tests/test_reactor.rs index 7d4a4e5a5..11ffc5a1a 100644 --- a/crates/tako/src/internal/tests/test_reactor.rs +++ b/crates/tako/src/internal/tests/test_reactor.rs @@ -18,19 +18,20 @@ use crate::internal::server::task::{Task, TaskRuntimeState}; use crate::internal::server::worker::Worker; use crate::internal::tests::utils::env::create_test_comm; use crate::internal::tests::utils::schedule::{ - assign_to_worker, create_test_scheduler, create_test_worker, create_test_workers, - finish_on_worker, force_assign, set_as_running, start_and_finish_on_worker, - start_mn_task_on_worker, start_on_worker_running, submit_test_tasks, + assign_to_worker, create_test_scheduler, finish_on_worker, force_assign, set_as_running, + start_and_finish_on_worker, start_mn_task_on_worker, start_on_worker_running, }; use crate::internal::tests::utils::shared::{res_kind_groups, res_kind_sum}; use crate::internal::tests::utils::sorted_vec; -use crate::internal::tests::utils::task::{TaskBuilder, task, task_running_msg, task_with_deps}; +use crate::internal::tests::utils::task::{TaskBuilder, task_running_msg}; use crate::internal::tests::utils::workflows::{submit_example_1, submit_example_3}; use crate::internal::worker::configuration::{ DEFAULT_MAX_DOWNLOAD_TRIES, DEFAULT_MAX_PARALLEL_DOWNLOADS, DEFAULT_WAIT_BETWEEN_DOWNLOAD_TRIES, OverviewConfiguration, }; use crate::resources::{ResourceAmount, ResourceDescriptorItem, ResourceIdMap}; +use crate::tests::utils::env::TestEnv; +use crate::tests::utils::worker::WorkerBuilder; use crate::worker::{ServerLostPolicy, WorkerConfiguration}; use crate::{Priority, TaskId, WorkerId}; @@ -150,73 +151,75 @@ fn test_worker_add() { #[test] fn test_scheduler_priority() { - let mut core = Core::default(); - let rmap = core.get_resource_map_mut(); - let mut comm = create_test_comm(); + let mut rt = TestEnv::new(); //new_workers(&mut core, &mut comm, vec![1]); - let t1 = task(501, rmap); - let t2 = task_with_deps(502, &[&t1], rmap); - let t3 = task(503, rmap); - let t4 = task_with_deps(504, &[&t2], rmap); + let t = TaskBuilder::new(); - let task_id5 = TaskId::new(123.into(), 1.into()); - let t5 = TaskBuilder::new(task_id5).build(rmap); - let task_id6 = TaskId::new(122.into(), 0.into()); - let t6 = TaskBuilder::new(task_id6).build(rmap); - let task_id7 = TaskId::new(123.into(), 2.into()); - let t7 = TaskBuilder::new(task_id7).task_deps(&[&t5]).build(rmap); - let task_id8 = TaskId::new(123.into(), 4.into()); - let t8 = TaskBuilder::new(task_id8).build(rmap); + let t1 = rt.new_task(501, &t); + rt.new_task(502, &TaskBuilder::new().task_deps(&[t1])); + let t3 = rt.new_task(503, &t); + rt.new_task(504, &TaskBuilder::new().task_deps(&[t3])); - on_new_tasks(&mut core, &mut comm, vec![t1, t2, t3, t4, t5, t6, t7, t8]); + rt.new_task_default(TaskId::new(123.into(), 1.into())); + let t5 = rt.new_task_default(TaskId::new(122.into(), 0.into())); + + rt.new_task( + TaskId::new(123.into(), 2.into()), + &TaskBuilder::new().task_deps(&[t5]), + ); + let t8 = rt.new_task_default(TaskId::new(123.into(), 4.into())); assert_eq!( - core.get_task(TaskId::new_test(501)).priority(), + rt.task(TaskId::new_test(501)).priority(), Priority::from_user_priority(0.into()).add_inverted_priority_u32(0) ); assert_eq!( - core.get_task(task_id8).priority(), + rt.task(t8).priority(), Priority::from_user_priority(0.into()).add_inverted_priority_u32(123) ); } #[test] fn test_submit_jobs() { - let mut core = Core::default(); + let mut rt = TestEnv::new(); let mut comm = create_test_comm(); - //new_workers(&mut core, &mut comm, vec![1]); - let rmap = core.get_resource_map_mut(); - let t1 = task(501, rmap); - let t2 = task_with_deps(502, &[&t1], rmap); - on_new_tasks(&mut core, &mut comm, vec![t1, t2]); + + let rmap = rt.core().resource_map_mut(); + let t1 = TaskBuilder::new().build(501, rmap); + let t2 = TaskBuilder::new().task_deps(&[t1.id]).build(502, rmap); + + on_new_tasks(rt.core(), &mut comm, vec![t1, t2]); comm.check_need_scheduling(); comm.emptiness_check(); - let t1 = core.get_task(501.into()); - let t2 = core.get_task(502.into()); - assert_eq!(t1.get_unfinished_deps(), 0); - assert_eq!(t2.get_unfinished_deps(), 1); + let task1 = rt.task(501); + let task2 = rt.task(502); + assert_eq!(task1.get_unfinished_deps(), 0); + assert_eq!(task2.get_unfinished_deps(), 1); + + check_task_consumers_exact(task1, &[task2]); - check_task_consumers_exact(t1, &[t2]); + let rmap = rt.core().resource_map_mut(); + let t3 = TaskBuilder::new().build(604, rmap); + let t4 = TaskBuilder::new() + .task_deps(&[TaskId::new_test(501), t3.id]) + .build(602, rmap); + let t5 = TaskBuilder::new().task_deps(&[t3.id]).build(603, rmap); + let t6 = TaskBuilder::new() + .task_deps(&[t3.id, t4.id, t5.id, TaskId::new_test(502)]) + .build(601, rmap); - let (tasks, rmap) = core.split_tasks_resource_map_mut(); - let t1 = tasks.get_task(501.into()); - let t2 = tasks.get_task(502.into()); - let t3 = task(604, rmap); - let t4 = task_with_deps(602, &[t1, &t3], rmap); - let t5 = task_with_deps(603, &[&t3], rmap); - let t6 = task_with_deps(601, &[&t3, &t4, &t5, t2], rmap); + on_new_tasks(rt.core(), &mut comm, vec![t3, t4, t5, t6]); - on_new_tasks(&mut core, &mut comm, vec![t3, t4, t5, t6]); comm.check_need_scheduling(); comm.emptiness_check(); - let t1 = core.get_task(501.into()); - let t2 = core.get_task(502.into()); - let t4 = core.get_task(602.into()); - let t6 = core.get_task(601.into()); + let t1 = rt.task(501); + let t2 = rt.task(502); + let t4 = rt.task(602); + let t6 = rt.task(601); check_task_consumers_exact(t1, &[t2, t4]); assert_eq!(t1.get_unfinished_deps(), 0); @@ -238,8 +241,8 @@ fn no_data_task_finished(task_id: u32) -> TaskFinishedMsg { #[test] fn test_assignments_and_finish() { - let mut core = Core::default(); - create_test_workers(&mut core, &[1, 1, 1]); + let mut rt = TestEnv::new(); + rt.new_workers_cpus(&[1, 1, 1]); /* t1 t2 t4 t5 @@ -247,35 +250,31 @@ fn test_assignments_and_finish() { t3[k] t7[k] */ - let rmap = core.get_resource_map_mut(); - let t1 = TaskBuilder::new(11).user_priority(12).build(rmap); - let t2 = task(12, rmap); - let t3 = task_with_deps(13, &[&t1, &t2], rmap); - let t4 = task(14, rmap); - let t5 = task(15, rmap); - let t7 = task_with_deps(17, &[&t4], rmap); - - let (id1, id2, id3, id5, id7) = (t1.id, t2.id, t3.id, t5.id, t7.id); + let t1 = rt.new_task(11, &TaskBuilder::new().user_priority(12)); + let t2 = rt.new_task_default(12); + let t3 = rt.new_task(13, &TaskBuilder::new().task_deps(&[t1, t2])); + let t4 = rt.new_task_default(14); + let t5 = rt.new_task_default(15); + let t7 = rt.new_task(17, &TaskBuilder::new().task_deps(&[t4])); - submit_test_tasks(&mut core, vec![t1, t2, t3, t4, t5, t7]); let mut comm = create_test_comm(); let mut scheduler = create_test_scheduler(); - force_assign(&mut core, &mut scheduler, 11, 100); - force_assign(&mut core, &mut scheduler, 12, 101); - force_assign(&mut core, &mut scheduler, 15, 100); + force_assign(rt.core(), &mut scheduler, 11, 100); + force_assign(rt.core(), &mut scheduler, 12, 101); + force_assign(rt.core(), &mut scheduler, 15, 100); - core.assert_fresh(&[id1, id3]); + rt.core().assert_fresh(&[t1, t3]); - scheduler.finish_scheduling(&mut core, &mut comm); + scheduler.finish_scheduling(rt.core(), &mut comm); - core.assert_not_fresh(&[id1]); - core.assert_fresh(&[id3]); + rt.core().assert_not_fresh(&[t1]); + rt.core().assert_fresh(&[t3]); - check_worker_tasks_exact(&core, 100, &[id1, id5]); - check_worker_tasks_exact(&core, 101, &[id2]); - check_worker_tasks_exact(&core, 102, &[]); + check_worker_tasks_exact(rt.core(), 100, &[t1, t5]); + check_worker_tasks_exact(rt.core(), 101, &[t2]); + check_worker_tasks_exact(rt.core(), 102, &[]); let msgs = comm.take_worker_msgs(100, 1); assert!(matches!( @@ -296,45 +295,45 @@ fn test_assignments_and_finish() { )); comm.emptiness_check(); - core.assert_assigned(&[id1, id2]); - core.assert_waiting(&[id3, id7]); + rt.core().assert_assigned(&[t1, t2]); + rt.core().assert_waiting(&[t3, t7]); - assert!(core.get_task(15.into()).is_assigned()); + assert!(rt.task(15).is_assigned()); // FINISH TASK WITHOUT CONSUMERS & KEEP FLAG - on_task_finished(&mut core, &mut comm, 100.into(), no_data_task_finished(15)); + on_task_finished(rt.core(), &mut comm, 100.into(), no_data_task_finished(15)); - assert!(core.find_task(15.into()).is_none()); - check_worker_tasks_exact(&core, 100, &[id1]); - check_worker_tasks_exact(&core, 101, &[id2]); - check_worker_tasks_exact(&core, 102, &[]); + assert!(rt.core().find_task(15.into()).is_none()); + check_worker_tasks_exact(rt.core(), 100, &[t1]); + check_worker_tasks_exact(rt.core(), 101, &[t2]); + check_worker_tasks_exact(rt.core(), 102, &[]); comm.check_need_scheduling(); assert_eq!(comm.client.take_task_finished(1)[0], TaskId::new_test(15)); comm.emptiness_check(); - assert!(core.find_task(15.into()).is_none()); + assert!(!rt.task_exists(15)); // FINISHED TASK WITH CONSUMERS - assert!(core.get_task(12.into()).is_assigned()); + assert!(rt.task(12).is_assigned()); - on_task_finished(&mut core, &mut comm, 101.into(), no_data_task_finished(12)); + on_task_finished(rt.core(), &mut comm, 101.into(), no_data_task_finished(12)); - assert!(core.find_task(12.into()).is_none()); - check_worker_tasks_exact(&core, 100, &[id1]); - check_worker_tasks_exact(&core, 101, &[]); - check_worker_tasks_exact(&core, 102, &[]); + assert!(!rt.task_exists(12)); + check_worker_tasks_exact(rt.core(), 100, &[t1]); + check_worker_tasks_exact(rt.core(), 101, &[]); + check_worker_tasks_exact(rt.core(), 102, &[]); comm.check_need_scheduling(); assert_eq!(comm.client.take_task_finished(1)[0], TaskId::new_test(12)); comm.emptiness_check(); - on_task_finished(&mut core, &mut comm, 100.into(), no_data_task_finished(11)); + on_task_finished(rt.core(), &mut comm, 100.into(), no_data_task_finished(11)); comm.check_need_scheduling(); - force_assign(&mut core, &mut scheduler, 13, 101); - scheduler.finish_scheduling(&mut core, &mut comm); + force_assign(rt.core(), &mut scheduler, 13, 101); + scheduler.finish_scheduling(rt.core(), &mut comm); let msgs = comm.take_worker_msgs(101, 1); assert!(matches!( @@ -347,9 +346,9 @@ fn test_assignments_and_finish() { assert_eq!(comm.client.take_task_finished(1)[0], TaskId::new_test(11)); comm.emptiness_check(); - core.sanity_check(); + rt.sanity_check(); - on_task_finished(&mut core, &mut comm, 101.into(), no_data_task_finished(13)); + on_task_finished(rt.core(), &mut comm, 101.into(), no_data_task_finished(13)); comm.check_need_scheduling(); @@ -358,27 +357,24 @@ fn test_assignments_and_finish() { vec![TaskId::new_test(13)] ); comm.emptiness_check(); - core.sanity_check(); - - comm.emptiness_check(); - core.sanity_check(); + rt.sanity_check(); } #[test] fn test_running_task_on_error() { - let mut core = Core::default(); - create_test_workers(&mut core, &[1, 1, 1]); - submit_example_1(&mut core); - start_and_finish_on_worker(&mut core, 11, 100); - start_and_finish_on_worker(&mut core, 12, 101); + let mut rt = TestEnv::new(); + rt.new_workers_cpus(&[1, 1, 1]); + submit_example_1(&mut rt); + start_and_finish_on_worker(rt.core(), 11, 100); + start_and_finish_on_worker(rt.core(), 12, 101); - assign_to_worker(&mut core, 13, 102); - core.assert_assigned(&[13]); - assert!(worker_has_task(&core, 102, 13)); + assign_to_worker(rt.core(), 13, 102); + rt.core().assert_assigned(&[13]); + assert!(worker_has_task(rt.core(), 102, 13)); let mut comm = create_test_comm(); on_task_error( - &mut core, + rt.core(), &mut comm, 102.into(), 13.into(), @@ -386,7 +382,7 @@ fn test_running_task_on_error() { message: "".to_string(), }, ); - assert!(!worker_has_task(&core, 102, 13)); + assert!(!worker_has_task(rt.core(), 102, 13)); let mut msgs = comm.client.take_task_errors(1); let (id, cs, _) = msgs.pop().unwrap(); @@ -402,33 +398,33 @@ fn test_running_task_on_error() { comm.check_need_scheduling(); comm.emptiness_check(); - assert!(core.find_task(16.into()).is_none()); - assert!(core.find_task(15.into()).is_none()); - core.sanity_check(); + assert!(!rt.task_exists(16)); + assert!(!rt.task_exists(15)); + rt.sanity_check(); } #[test] fn test_steal_tasks_ok() { - let mut core = Core::default(); - create_test_workers(&mut core, &[1, 1, 1]); - submit_example_1(&mut core); - start_and_finish_on_worker(&mut core, 11, 100); - start_and_finish_on_worker(&mut core, 12, 101); + let mut rt = TestEnv::new(); + rt.new_workers_cpus(&[1, 1, 1]); + submit_example_1(&mut rt); + start_and_finish_on_worker(rt.core(), 11, 100); + start_and_finish_on_worker(rt.core(), 12, 101); let task_id = 13; - assign_to_worker(&mut core, task_id, 101); + assign_to_worker(rt.core(), task_id, 101); - assert!(worker_has_task(&core, 101, task_id)); - assert!(!worker_has_task(&core, 100, task_id)); + assert!(worker_has_task(rt.core(), 101, task_id)); + assert!(!worker_has_task(rt.core(), 100, task_id)); let mut comm = create_test_comm(); let mut scheduler = create_test_scheduler(); - force_reassign(&mut core, &mut scheduler, task_id, 100); - scheduler.finish_scheduling(&mut core, &mut comm); + force_reassign(rt.core(), &mut scheduler, task_id, 100); + scheduler.finish_scheduling(rt.core(), &mut comm); - assert!(!worker_has_task(&core, 101, task_id)); - assert!(worker_has_task(&core, 100, task_id)); + assert!(!worker_has_task(rt.core(), 101, task_id)); + assert!(worker_has_task(rt.core(), 100, task_id)); let msgs = comm.take_worker_msgs(101, 1); assert!( @@ -437,7 +433,7 @@ fn test_steal_tasks_ok() { comm.emptiness_check(); on_steal_response( - &mut core, + rt.core(), &mut comm, 101.into(), StealResponseMsg { @@ -449,8 +445,8 @@ fn test_steal_tasks_ok() { }, ); - assert!(!worker_has_task(&core, 101, task_id)); - assert!(worker_has_task(&core, 100, task_id)); + assert!(!worker_has_task(rt.core(), 101, task_id)); + assert!(worker_has_task(rt.core(), 100, task_id)); let msgs = comm.take_worker_msgs(100, 1); assert!(matches!( @@ -459,23 +455,23 @@ fn test_steal_tasks_ok() { if tasks[0].id.job_task_id().as_num() == 13 )); comm.emptiness_check(); - core.sanity_check(); + rt.sanity_check(); } #[test] fn test_steal_tasks_running() { - let mut core = Core::default(); - create_test_workers(&mut core, &[1, 1, 1]); - submit_example_1(&mut core); - start_and_finish_on_worker(&mut core, 11, 100); - start_and_finish_on_worker(&mut core, 12, 101); - assign_to_worker(&mut core, 13, 101); + let mut rt = TestEnv::new(); + rt.new_workers_cpus(&[1, 1, 1]); + submit_example_1(&mut rt); + start_and_finish_on_worker(rt.core(), 11, 100); + start_and_finish_on_worker(rt.core(), 12, 101); + assign_to_worker(rt.core(), 13, 101); let mut comm = create_test_comm(); let mut scheduler = create_test_scheduler(); - force_reassign(&mut core, &mut scheduler, 13, 100); - scheduler.finish_scheduling(&mut core, &mut comm); + force_reassign(rt.core(), &mut scheduler, 13, 100); + scheduler.finish_scheduling(rt.core(), &mut comm); let msgs = comm.take_worker_msgs(101, 1); assert!( @@ -483,17 +479,17 @@ fn test_steal_tasks_running() { ); comm.emptiness_check(); - assert!(!worker_has_task(&core, 101, 13)); - assert!(worker_has_task(&core, 100, 13)); + assert!(!worker_has_task(rt.core(), 101, 13)); + assert!(worker_has_task(rt.core(), 100, 13)); - on_task_running(&mut core, &mut comm, 101.into(), task_running_msg(13)); + on_task_running(rt.core(), &mut comm, 101.into(), task_running_msg(13)); comm.client.take_task_running(1); comm.check_need_scheduling(); comm.emptiness_check(); - core.sanity_check(); + rt.sanity_check(); on_steal_response( - &mut core, + rt.core(), &mut comm, 101.into(), StealResponseMsg { @@ -501,64 +497,59 @@ fn test_steal_tasks_running() { }, ); - assert!(worker_has_task(&core, 101, 13)); - assert!(!worker_has_task(&core, 100, 13)); - core.sanity_check(); + assert!(worker_has_task(rt.core(), 101, 13)); + assert!(!worker_has_task(rt.core(), 100, 13)); + rt.sanity_check(); comm.emptiness_check(); } #[test] #[should_panic] fn finish_unassigned_task() { - let mut core = Core::default(); - create_test_workers(&mut core, &[1, 1, 1]); - submit_example_1(&mut core); - finish_on_worker(&mut core, 11, 100); + let mut rt = TestEnv::new(); + rt.new_workers_cpus(&[1, 1, 1]); + submit_example_1(&mut rt); + finish_on_worker(rt.core(), 11, 100); } #[test] fn finish_task_without_outputs() { - let mut core = Core::default(); - create_test_workers(&mut core, &[1]); - let rmap = core.get_resource_map_mut(); - let t1 = task_with_deps(1, &[], rmap); - submit_test_tasks(&mut core, vec![t1]); - assign_to_worker(&mut core, 1, 100); + let mut rt = TestEnv::new(); + rt.new_workers_cpus(&[1]); + rt.new_task_assigned(1, &TaskBuilder::new(), 100); let mut comm = create_test_comm(); - on_task_finished(&mut core, &mut comm, 100.into(), no_data_task_finished(1)); + on_task_finished(rt.core(), &mut comm, 100.into(), no_data_task_finished(1)); comm.check_need_scheduling(); assert_eq!(comm.client.take_task_finished(1)[0], TaskId::new_test(1)); comm.emptiness_check(); - core.sanity_check(); + rt.sanity_check(); } #[test] fn test_task_cancel() { - let mut core = Core::default(); - create_test_workers(&mut core, &[1, 1, 1]); - submit_example_1(&mut core); + let mut rt = TestEnv::new(); + rt.new_workers_cpus(&[1, 1, 1]); + submit_example_1(&mut rt); - let rmap = core.get_resource_map_mut(); - let t40 = task(40, rmap); - let t41 = task(41, rmap); - let t42 = task(42, rmap); - - submit_test_tasks(&mut core, vec![t40, t41, t42]); - assign_to_worker(&mut core, 11, 101); - assign_to_worker(&mut core, 12, 101); - assign_to_worker(&mut core, 40, 101); - assign_to_worker(&mut core, 41, 100); - - start_stealing(&mut core, 12, 100); - set_as_running(&mut core, 12, 101); - fail_steal(&mut core, 12, 101); - start_stealing(&mut core, 40, 100); - start_stealing(&mut core, 41, 101); + rt.new_task_default(40); + rt.new_task_default(41); + rt.new_task_default(42); + + assign_to_worker(rt.core(), 11, 101); + assign_to_worker(rt.core(), 12, 101); + assign_to_worker(rt.core(), 40, 101); + assign_to_worker(rt.core(), 41, 100); + + start_stealing(rt.core(), 12, 100); + set_as_running(rt.core(), 12, 101); + fail_steal(rt.core(), 12, 101); + start_stealing(rt.core(), 40, 100); + start_stealing(rt.core(), 41, 101); let mut comm = create_test_comm(); on_cancel_tasks( - &mut core, + rt.core(), &mut comm, &vec![11, 12, 40, 41, 33] .into_iter() @@ -576,34 +567,32 @@ fn test_task_cancel() { matches!(&msgs[0], &ToWorkerMessage::CancelTasks(TaskIdsMsg { ref ids }) if sorted_vec(ids.clone()) == vec![TaskId::new_test(11), TaskId::new_test(12), TaskId::new_test(40)]) ); - assert_eq!(core.task_map().len(), 1); - assert!(core.find_task(42.into()).is_some()); + assert_eq!(rt.core().task_map().len(), 1); + assert!(rt.core().find_task(42.into()).is_some()); comm.check_need_scheduling(); comm.emptiness_check(); - core.sanity_check(); + rt.sanity_check(); } #[test] fn test_worker_lost_with_mn_task_non_root() { - let mut core = Core::default(); - create_test_workers(&mut core, &[1, 1, 1, 1]); - let rmap = core.get_resource_map_mut(); - let task1 = TaskBuilder::new(1).n_nodes(3).build(rmap); - submit_test_tasks(&mut core, vec![task1]); + let mut rt = TestEnv::new(); + rt.new_workers_cpus(&[1, 1, 1, 1]); + rt.new_task(1, &TaskBuilder::new().n_nodes(3)); start_mn_task_on_worker( - &mut core, + rt.core(), TaskId::new_test(1), vec![WorkerId::new(103), WorkerId::new(101), WorkerId::new(100)], ); let mut comm = create_test_comm(); on_remove_worker( - &mut core, + rt.core(), &mut comm, 101.into(), LostWorkerReason::HeartbeatLost, ); - core.sanity_check(); + rt.sanity_check(); assert_eq!(comm.client.take_lost_workers().len(), 1); let msgs = comm.take_worker_msgs(103, 1); assert!( @@ -614,12 +603,12 @@ fn test_worker_lost_with_mn_task_non_root() { ToWorkerMessage::LostWorker(w) if w.as_num() == 101)); comm.check_need_scheduling(); comm.emptiness_check(); - assert!(core.get_task(TaskId::new_test(1)).is_waiting()); + assert!(rt.task(TaskId::new_test(1)).is_waiting()); } #[test] fn test_worker_lost_with_mn_task_root() { - let mut core = Core::default(); + /*let mut core = Core::default(); create_test_workers(&mut core, &[1, 1, 1, 1]); let rmap = core.get_resource_map_mut(); let task1 = TaskBuilder::new(1).n_nodes(3).build(rmap); @@ -644,24 +633,22 @@ fn test_worker_lost_with_mn_task_root() { )); comm.check_need_scheduling(); comm.emptiness_check(); - assert!(core.get_task(TaskId::new_test(1)).is_waiting()); + assert!(core.get_task(TaskId::new_test(1)).is_waiting());*/ } #[test] fn test_worker_crashing_task() { - let mut core = Core::default(); - let rmap = core.get_resource_map_mut(); - let t1 = task(1, rmap); - submit_test_tasks(&mut core, vec![t1]); - assert_eq!(core.get_task(TaskId::new_test(1)).crash_counter, 0); + let mut rt = TestEnv::new(); + rt.new_task_default(1); + assert_eq!(rt.task(TaskId::new_test(1)).crash_counter, 0); for x in 1..=5 { let mut comm = create_test_comm(); let worker_id = 100 + x; - create_test_worker(&mut core, worker_id.into(), 1); - start_on_worker_running(&mut core, 1, worker_id); + rt.new_worker_with_id(worker_id, &WorkerBuilder::new(1)); + start_on_worker_running(rt.core(), 1, worker_id); on_remove_worker( - &mut core, + rt.core(), &mut comm, worker_id.into(), LostWorkerReason::HeartbeatLost, @@ -683,7 +670,7 @@ fn test_worker_crashing_task() { "Task was running on a worker that was lost; the task has occurred 5 times in this situation and limit was reached." ); } else { - assert_eq!(core.get_task(TaskId::new_test(1)).crash_counter, x); + assert_eq!(rt.task(TaskId::new_test(1)).crash_counter, x); } comm.emptiness_check(); } @@ -691,19 +678,17 @@ fn test_worker_crashing_task() { #[test] fn test_task_mn_fail() { - let mut core = Core::default(); - create_test_workers(&mut core, &[1, 1, 1, 1]); - let rmap = core.get_resource_map_mut(); - let task1 = TaskBuilder::new(1).n_nodes(3).build(rmap); - submit_test_tasks(&mut core, vec![task1]); + let mut rt = TestEnv::new(); + rt.new_workers_cpus(&[1, 1, 1, 1]); + rt.new_task(1, &TaskBuilder::new().n_nodes(3)); start_mn_task_on_worker( - &mut core, + rt.core(), TaskId::new_test(1), vec![WorkerId::new(103), WorkerId::new(101), WorkerId::new(100)], ); let mut comm = create_test_comm(); on_task_error( - &mut core, + rt.core(), &mut comm, 103.into(), 1.into(), @@ -711,16 +696,16 @@ fn test_task_mn_fail() { message: "".to_string(), }, ); - core.sanity_check(); + rt.sanity_check(); let msgs = comm.client.take_task_errors(1); assert_eq!(msgs[0].0, TaskId::new_test(1)); comm.check_need_scheduling(); comm.emptiness_check(); - assert!(core.find_task(1.into()).is_none()); + assert!(rt.core().find_task(1.into()).is_none()); for w in &[100, 101, 102, 103] { assert!( - core.get_worker_map() - .get_worker((*w).into()) + rt.core() + .get_worker_by_id_or_panic((*w).into()) .mn_task() .is_none() ); @@ -729,19 +714,17 @@ fn test_task_mn_fail() { #[test] fn test_task_mn_cancel() { - let mut core = Core::default(); - create_test_workers(&mut core, &[1, 1, 1, 1]); - let rmap = core.get_resource_map_mut(); - let task1 = TaskBuilder::new(1).n_nodes(3).build(rmap); - submit_test_tasks(&mut core, vec![task1]); + let mut rt = TestEnv::new(); + rt.new_workers_cpus(&[1, 1, 1, 1]); + let _task1 = rt.new_task(1, &TaskBuilder::new().n_nodes(3)); start_mn_task_on_worker( - &mut core, + rt.core(), TaskId::new_test(1), vec![WorkerId::new(103), WorkerId::new(101), WorkerId::new(100)], ); let mut comm = create_test_comm(); - on_cancel_tasks(&mut core, &mut comm, &[TaskId::new_test(1)]); - core.sanity_check(); + on_cancel_tasks(rt.core(), &mut comm, &[TaskId::new_test(1)]); + rt.sanity_check(); let msgs = comm.take_worker_msgs(103, 1); assert!( matches!(&msgs[0], &ToWorkerMessage::CancelTasks(TaskIdsMsg { ref ids }) if ids == &vec![TaskId::new_test(1)]) @@ -749,48 +732,44 @@ fn test_task_mn_cancel() { comm.check_need_scheduling(); comm.emptiness_check(); let mut scheduler = create_test_scheduler(); - scheduler.run_scheduling(&mut core, &mut comm); - core.sanity_check(); + scheduler.run_scheduling(rt.core(), &mut comm); + rt.sanity_check(); comm.emptiness_check(); - assert!(core.find_task(1.into()).is_none()); - for w in core.get_workers() { + assert!(!rt.task_exists(1)); + for w in rt.core().get_workers() { assert!(w.mn_task.is_none()); } } #[test] fn test_running_task() { - let mut core = Core::default(); - create_test_workers(&mut core, &[1, 1, 1]); - let rmap = core.get_resource_map_mut(); - let t1 = task(1, rmap); - let t2 = task(2, rmap); - submit_test_tasks(&mut core, vec![t1, t2]); - assign_to_worker(&mut core, 1, 101); - assign_to_worker(&mut core, 2, 101); + let mut rt = TestEnv::new(); + rt.new_workers_cpus(&[1, 1, 1]); + rt.new_task_assigned(1, &TaskBuilder::new(), 101); + rt.new_task_assigned(2, &TaskBuilder::new(), 101); let mut comm = create_test_comm(); comm.emptiness_check(); - on_task_running(&mut core, &mut comm, 101.into(), task_running_msg(1)); + on_task_running(rt.core(), &mut comm, 101.into(), task_running_msg(1)); assert_eq!(comm.client.take_task_running(1), vec![TaskId::new_test(1)]); comm.emptiness_check(); - on_task_running(&mut core, &mut comm, 101.into(), task_running_msg(2)); + on_task_running(rt.core(), &mut comm, 101.into(), task_running_msg(2)); assert_eq!(comm.client.take_task_running(1)[0], TaskId::new_test(2)); comm.emptiness_check(); assert!(matches!( - core.task(1).state, + rt.task(1).state, TaskRuntimeState::Running { worker_id, .. } if worker_id.as_num() == 101 )); assert!(matches!( - core.task(2).state, + rt.task(2).state, TaskRuntimeState::Running { worker_id, .. @@ -798,7 +777,7 @@ fn test_running_task() { )); on_remove_worker( - &mut core, + rt.core(), &mut comm, 101.into(), LostWorkerReason::HeartbeatLost, @@ -819,27 +798,25 @@ fn test_running_task() { #[test] fn test_finished_before_steal_response() { - let mut core = Core::default(); - create_test_workers(&mut core, &[1, 1, 1]); - let rmap = core.get_resource_map_mut(); - let t1 = task(1, rmap); - submit_test_tasks(&mut core, vec![t1]); - assign_to_worker(&mut core, 1, 101); - start_stealing(&mut core, 1, 102); - assert!(worker_has_task(&core, 102, 1)); + let mut rt = TestEnv::new(); + rt.new_workers_cpus(&[1, 1, 1]); + rt.new_task_default(1); + assign_to_worker(rt.core(), 1, 101); + start_stealing(rt.core(), 1, 102); + assert!(worker_has_task(rt.core(), 102, 1)); let mut comm = create_test_comm(); - on_task_finished(&mut core, &mut comm, 101.into(), no_data_task_finished(1)); + on_task_finished(rt.core(), &mut comm, 101.into(), no_data_task_finished(1)); comm.check_need_scheduling(); assert_eq!(comm.client.take_task_finished(1)[0], TaskId::new_test(1)); comm.emptiness_check(); - assert!(!worker_has_task(&core, 101, 1)); - assert!(!worker_has_task(&core, 102, 1)); + assert!(!worker_has_task(rt.core(), 101, 1)); + assert!(!worker_has_task(rt.core(), 102, 1)); on_steal_response( - &mut core, + rt.core(), &mut comm, 101.into(), StealResponseMsg { @@ -849,32 +826,30 @@ fn test_finished_before_steal_response() { comm.emptiness_check(); - assert!(!worker_has_task(&core, 101, 1)); - assert!(!worker_has_task(&core, 102, 1)); + assert!(!worker_has_task(rt.core(), 101, 1)); + assert!(!worker_has_task(rt.core(), 102, 1)); } #[test] fn test_running_before_steal_response() { - let mut core = Core::default(); - create_test_workers(&mut core, &[1, 1, 1]); - let rmap = core.get_resource_map_mut(); - let t1 = task(1, rmap); - submit_test_tasks(&mut core, vec![t1]); - assign_to_worker(&mut core, 1, 101); - start_stealing(&mut core, 1, 102); - assert!(worker_has_task(&core, 102, 1)); + let mut rt = TestEnv::new(); + rt.new_workers_cpus(&[1, 1, 1]); + rt.new_task_default(1); + assign_to_worker(rt.core(), 1, 101); + start_stealing(rt.core(), 1, 102); + assert!(worker_has_task(rt.core(), 102, 1)); let mut comm = create_test_comm(); - on_task_running(&mut core, &mut comm, 101.into(), task_running_msg(1)); + on_task_running(rt.core(), &mut comm, 101.into(), task_running_msg(1)); comm.check_need_scheduling(); assert_eq!(comm.client.take_task_running(1)[0], TaskId::new_test(1)); comm.emptiness_check(); - assert!(worker_has_task(&core, 101, 1)); - assert!(!worker_has_task(&core, 102, 1)); + assert!(worker_has_task(rt.core(), 101, 1)); + assert!(!worker_has_task(rt.core(), 102, 1)); on_steal_response( - &mut core, + rt.core(), &mut comm, 101.into(), StealResponseMsg { @@ -883,43 +858,40 @@ fn test_running_before_steal_response() { ); comm.emptiness_check(); - assert!(worker_has_task(&core, 101, 1)); - assert!(!worker_has_task(&core, 102, 1)); + assert!(worker_has_task(rt.core(), 101, 1)); + assert!(!worker_has_task(rt.core(), 102, 1)); } #[test] fn test_ready_to_assign_is_empty_after_cancel() { - let mut core = Core::default(); - let rmap = core.get_resource_map_mut(); - let t1 = task(1, rmap); - submit_test_tasks(&mut core, vec![t1]); - cancel_tasks(&mut core, &[1]); - assert!(core.take_single_node_ready_to_assign().is_empty()); + let mut rt = TestEnv::new(); + rt.new_task_default(1); + cancel_tasks(rt.core(), &[1]); + assert!(rt.core().take_single_node_ready_to_assign().is_empty()); } #[test] fn test_after_cancel_messages() { - let mut core = Core::default(); - create_test_workers(&mut core, &[1, 1, 1]); - let rmap = core.get_resource_map_mut(); - let t1 = task(1, rmap); - let t2 = task(2, rmap); - let t3 = task(3, rmap); - let t4 = task(4, rmap); - submit_test_tasks(&mut core, vec![t1, t2, t3, t4]); - assign_to_worker(&mut core, 1, 101); - assign_to_worker(&mut core, 2, 101); - assign_to_worker(&mut core, 3, 101); - assign_to_worker(&mut core, 4, 101); - - cancel_tasks(&mut core, &[1, 2, 3, 4]); + let mut rt = TestEnv::new(); + rt.new_workers_cpus(&[1, 1, 1]); + rt.new_task_default(1); + rt.new_task_default(2); + rt.new_task_default(3); + rt.new_task_default(4); + + assign_to_worker(rt.core(), 1, 101); + assign_to_worker(rt.core(), 2, 101); + assign_to_worker(rt.core(), 3, 101); + assign_to_worker(rt.core(), 4, 101); + + cancel_tasks(rt.core(), &[1, 2, 3, 4]); let mut comm = create_test_comm(); - on_task_finished(&mut core, &mut comm, 101.into(), no_data_task_finished(1)); + on_task_finished(rt.core(), &mut comm, 101.into(), no_data_task_finished(1)); comm.emptiness_check(); on_steal_response( - &mut core, + rt.core(), &mut comm, 101.into(), StealResponseMsg { @@ -929,7 +901,7 @@ fn test_after_cancel_messages() { comm.emptiness_check(); on_steal_response( - &mut core, + rt.core(), &mut comm, 101.into(), StealResponseMsg { @@ -939,7 +911,7 @@ fn test_after_cancel_messages() { comm.emptiness_check(); on_task_error( - &mut core, + rt.core(), &mut comm, 101.into(), 3.into(), @@ -949,40 +921,39 @@ fn test_after_cancel_messages() { ); comm.emptiness_check(); - on_task_running(&mut core, &mut comm, 101.into(), task_running_msg(4)); + on_task_running(rt.core(), &mut comm, 101.into(), task_running_msg(4)); comm.emptiness_check(); } #[test] fn lost_worker_with_running_and_assign_tasks() { - let mut core = Core::default(); - create_test_workers(&mut core, &[1, 1, 1]); - submit_example_1(&mut core); + let mut rt = TestEnv::new(); + rt.new_workers_cpus(&[1, 1, 1]); + submit_example_1(&mut rt); - let rmap = core.get_resource_map_mut(); - let t40 = task(40, rmap); - let t41 = task(41, rmap); - submit_test_tasks(&mut core, vec![t40, t41]); + rt.new_task_default(40); + rt.new_task_default(41); - assign_to_worker(&mut core, 11, 101); - assign_to_worker(&mut core, 12, 101); - assign_to_worker(&mut core, 40, 101); - assign_to_worker(&mut core, 41, 100); + assign_to_worker(rt.core(), 11, 101); + assign_to_worker(rt.core(), 12, 101); + assign_to_worker(rt.core(), 40, 101); + assign_to_worker(rt.core(), 41, 100); - start_stealing(&mut core, 12, 100); - set_as_running(&mut core, 12, 101); - fail_steal(&mut core, 12, 101); - start_stealing(&mut core, 40, 100); - start_stealing(&mut core, 41, 101); + start_stealing(rt.core(), 12, 100); + set_as_running(rt.core(), 12, 101); + fail_steal(rt.core(), 12, 101); + start_stealing(rt.core(), 40, 100); + start_stealing(rt.core(), 41, 101); - core.assert_running(&[12]); - assert_eq!(core.get_task(12.into()).instance_id, 0.into()); + rt.core().assert_running(&[12]); + assert_eq!(rt.task(12).instance_id, 0.into()); - core.assert_task_condition(&[11, 12, 40, 41], |t| !t.is_fresh()); + rt.core() + .assert_task_condition(&[11, 12, 40, 41], |t| !t.is_fresh()); let mut comm = create_test_comm(); on_remove_worker( - &mut core, + rt.core(), &mut comm, 101.into(), LostWorkerReason::HeartbeatLost, @@ -993,14 +964,14 @@ fn lost_worker_with_running_and_assign_tasks() { vec![(WorkerId::new(101), vec![TaskId::new_test(12)])] ); - assert_eq!(core.take_single_node_ready_to_assign().len(), 3); - core.assert_ready(&[11, 12]); - assert_eq!(core.get_task(12.into()).instance_id, 1.into()); - assert!(core.get_task(40.into()).is_ready()); - core.assert_ready(&[40]); - core.assert_fresh(&[11, 12, 40]); + assert_eq!(rt.core().take_single_node_ready_to_assign().len(), 3); + rt.core().assert_ready(&[11, 12]); + assert_eq!(rt.task(12).instance_id, 1.into()); + assert!(rt.task(40).is_ready()); + rt.core().assert_ready(&[40]); + rt.core().assert_fresh(&[11, 12, 40]); assert!(matches!( - core.get_task(41.into()).state, + rt.task(41).state, TaskRuntimeState::Stealing(w, None) if w.as_num() == 100 )); @@ -1013,7 +984,7 @@ fn lost_worker_with_running_and_assign_tasks() { comm.emptiness_check(); on_steal_response( - &mut core, + rt.core(), &mut comm, 100.into(), StealResponseMsg { @@ -1021,14 +992,13 @@ fn lost_worker_with_running_and_assign_tasks() { }, ); - assert_eq!(core.take_single_node_ready_to_assign().len(), 1); - core.assert_ready(&[41]); - core.assert_fresh(&[41]); + assert_eq!(rt.core().take_single_node_ready_to_assign().len(), 1); + rt.core().assert_ready(&[41]); + rt.core().assert_fresh(&[41]); comm.check_need_scheduling(); comm.emptiness_check(); - - core.sanity_check(); + rt.sanity_check(); } fn force_reassign, T: Into>( @@ -1098,64 +1068,62 @@ fn check_task_consumers_exact(task: &Task, consumers: &[&Task]) { #[test] fn test_task_deps() { - let mut core = Core::default(); + let mut rt = TestEnv::new(); //create_test_workers(&mut core, &[1, 1, 1]); - submit_example_3(&mut core); - assert_eq!(core.get_ready_to_assign().len(), 2); - create_test_workers(&mut core, &[1]); - start_and_finish_on_worker(&mut core, 2, 100); - core.assert_waiting(&[3, 4, 6]); - core.assert_ready(&[5]); - - start_and_finish_on_worker(&mut core, 1, 100); - core.assert_waiting(&[6]); - core.assert_ready(&[3, 4, 5]); + submit_example_3(&mut rt); + assert_eq!(rt.core().get_ready_to_assign().len(), 2); + rt.new_workers_cpus(&[1]); + start_and_finish_on_worker(rt.core(), 2, 100); + rt.core().assert_waiting(&[3, 4, 6]); + rt.core().assert_ready(&[5]); + + start_and_finish_on_worker(rt.core(), 1, 100); + rt.core().assert_waiting(&[6]); + rt.core().assert_ready(&[3, 4, 5]); } #[test] fn test_worker_groups() { - let mut core = Core::default(); - create_test_workers(&mut core, &[1, 1]); - let g = core.worker_group("default").unwrap(); + let mut rt = TestEnv::new(); + rt.new_workers_cpus(&[1, 1]); + let g = rt.core().worker_group("default").unwrap(); assert_eq!( sorted_vec(g.worker_ids().collect()), vec![WorkerId::new(100), WorkerId::new(101)] ); let mut comm = create_test_comm(); on_remove_worker( - &mut core, + rt.core(), &mut comm, 101.into(), LostWorkerReason::HeartbeatLost, ); - let g = core.worker_group("default").unwrap(); + let g = rt.core().worker_group("default").unwrap(); assert_eq!( sorted_vec(g.worker_ids().collect()), vec![WorkerId::new(100)] ); let mut comm = create_test_comm(); on_remove_worker( - &mut core, + rt.core(), &mut comm, 100.into(), LostWorkerReason::HeartbeatLost, ); - assert!(core.worker_group("default").is_none()); + assert!(rt.core().worker_group("default").is_none()); } #[test] fn test_data_deps_no_output() { - let mut core = Core::default(); - create_test_workers(&mut core, &[4]); - let rmap = core.get_resource_map_mut(); - let t1 = TaskBuilder::new(1).build(rmap); - let t2 = TaskBuilder::new(2).data_dep(&t1, 11).build(rmap); - submit_test_tasks(&mut core, vec![t1, t2]); - assign_to_worker(&mut core, 1, 100); - core.sanity_check(); + let mut rt = TestEnv::new(); + rt.new_workers_cpus(&[4]); + let t1 = rt.new_task_default(1); + rt.new_task(2, &TaskBuilder::new().data_dep(t1, 11)); + assign_to_worker(rt.core(), 1, 100); + rt.sanity_check(); let mut comm = create_test_comm(); on_task_finished( - &mut core, + rt.core(), &mut comm, 100.into(), TaskFinishedMsg { @@ -1176,22 +1144,22 @@ fn test_data_deps_no_output() { #[test] fn test_data_deps_missing_outputs() { - let mut core = Core::default(); - create_test_workers(&mut core, &[4]); - let rmap = core.get_resource_map_mut(); - let t1 = TaskBuilder::new(1).build(rmap); - let t2 = TaskBuilder::new(2) - .data_dep(&t1, 10) - .data_dep(&t1, 11) - .data_dep(&t1, 100) - .data_dep(&t1, 101) - .build(rmap); - submit_test_tasks(&mut core, vec![t1, t2]); - assign_to_worker(&mut core, 1, 100); - core.sanity_check(); + let mut rt = TestEnv::new(); + rt.new_workers_cpus(&[4]); + let t1 = rt.new_task_default(1); + let _t2 = rt.new_task( + 2, + &TaskBuilder::new() + .data_dep(t1, 10) + .data_dep(t1, 11) + .data_dep(t1, 100) + .data_dep(t1, 101), + ); + assign_to_worker(rt.core(), 1, 100); + rt.sanity_check(); let mut comm = create_test_comm(); on_task_finished( - &mut core, + rt.core(), &mut comm, 100.into(), TaskFinishedMsg { @@ -1236,24 +1204,19 @@ fn test_data_deps_missing_outputs() { #[test] fn test_data_deps_basic() { - let mut core = Core::default(); - let rmap = core.get_resource_map_mut(); - let t1 = TaskBuilder::new(1).build(rmap); - let t2 = TaskBuilder::new(2).data_dep(&t1, 0).build(rmap); - let t3 = TaskBuilder::new(3) - .data_dep(&t2, 123) - .data_dep(&t2, 478) - .build(rmap); - submit_test_tasks(&mut core, vec![t1, t2, t3]); - assert_eq!(core.get_task(2.into()).task_deps, [TaskId::new_test(1)]); - core.assert_waiting(&[2, 3]); - core.assert_ready(&[1]); - create_test_workers(&mut core, &[4]); + let mut rt = TestEnv::new(); + let t1 = rt.new_task_default(1); + let t2 = rt.new_task(2, &TaskBuilder::new().data_dep(t1, 0)); + let _t3 = rt.new_task(3, &TaskBuilder::new().data_dep(t2, 123).data_dep(t2, 478)); + assert_eq!(rt.task(2).task_deps, [TaskId::new_test(1)]); + rt.core().assert_waiting(&[2, 3]); + rt.core().assert_ready(&[1]); + rt.new_workers_cpus(&[4]); let mut comm = create_test_comm(); - assign_to_worker(&mut core, 1, 100); + assign_to_worker(rt.core(), 1, 100); on_task_finished( - &mut core, + rt.core(), &mut comm, 100.into(), TaskFinishedMsg { @@ -1268,17 +1231,17 @@ fn test_data_deps_basic() { comm.client.take_task_finished(1); comm.emptiness_check(); - core.assert_waiting(&[3]); - core.assert_ready(&[2]); + rt.core().assert_waiting(&[3]); + rt.core().assert_ready(&[2]); - assign_to_worker(&mut core, 2, 100); + assign_to_worker(rt.core(), 2, 100); - let o = core.dataobj_map(); + let o = rt.core().dataobj_map(); assert_eq!(o.len(), 1); o.get_data_object(DataObjectId::new(1.into(), 0.into())); on_task_finished( - &mut core, + rt.core(), &mut comm, 100.into(), TaskFinishedMsg { @@ -1302,9 +1265,9 @@ fn test_data_deps_basic() { ); comm.client.take_task_finished(1); comm.emptiness_check(); - core.assert_ready(&[3]); + rt.core().assert_ready(&[3]); - let o = core.dataobj_map(); + let o = rt.core().dataobj_map(); assert_eq!(o.len(), 2); o.get_data_object(DataObjectId::new(2.into(), 123.into())); o.get_data_object(DataObjectId::new(2.into(), 478.into())); diff --git a/crates/tako/src/internal/tests/test_scheduler_mn.rs b/crates/tako/src/internal/tests/test_scheduler_mn.rs index 8ca4be994..96f933f85 100644 --- a/crates/tako/src/internal/tests/test_scheduler_mn.rs +++ b/crates/tako/src/internal/tests/test_scheduler_mn.rs @@ -1,16 +1,12 @@ use crate::internal::messages::worker::ToWorkerMessage; -use crate::internal::server::core::Core; use crate::internal::server::task::Task; use crate::internal::tests::utils::env::{TestComm, TestEnv, create_test_comm}; use crate::internal::tests::utils::resources::ResBuilder; -use crate::internal::tests::utils::schedule::{ - create_test_scheduler, create_test_worker, create_test_worker_config, create_test_workers, - finish_on_worker, new_test_worker, submit_test_tasks, -}; +use crate::internal::tests::utils::schedule::{create_test_scheduler, finish_on_worker}; +use crate::WorkerId; use crate::internal::tests::utils::task::TaskBuilder; -use crate::resources::{ResourceDescriptor, ResourceIdMap}; -use crate::{TaskId, WorkerId}; +use crate::tests::utils::worker::WorkerBuilder; use std::time::Duration; #[derive(Debug)] @@ -55,23 +51,16 @@ fn check_worker_status_change(s1: WorkerStatus, s2: WorkerStatus, ms: &[ToWorker #[test] fn test_schedule_mn_simple() { - let mut core = Core::default(); - create_test_workers(&mut core, &[5, 5, 5, 5, 5]); - let rmap = core.get_resource_map_mut(); - let tasks: Vec = (1..=4) - .map(|i| { - TaskBuilder::new(i) - .user_priority(i as i32) - .n_nodes(2) - .build(rmap) - }) - .collect(); - submit_test_tasks(&mut core, tasks); - core.sanity_check(); + let mut rt = TestEnv::new(); + rt.new_workers_cpus(&[5, 5, 5, 5, 5]); + (1..=4).for_each(|i| { + rt.new_task(i, &TaskBuilder::new().user_priority(i as i32).n_nodes(2)); + }); + rt.sanity_check(); let mut scheduler = create_test_scheduler(); let mut comm = create_test_comm(); - scheduler.run_scheduling(&mut core, &mut comm); - core.sanity_check(); + scheduler.run_scheduling(rt.core(), &mut comm); + rt.sanity_check(); let test_mn_task = |task: &Task, comm: &mut TestComm| -> Vec { let ws = task.mn_placement().unwrap().to_vec(); @@ -84,66 +73,60 @@ fn test_schedule_mn_simple() { ws }; - let task3 = core.get_task(3.into()); - let ws3 = test_mn_task(task3, &mut comm); - let task4 = core.get_task(4.into()); - let ws4 = test_mn_task(task4, &mut comm); + let ws3 = test_mn_task(rt.task(3), &mut comm); + let ws4 = test_mn_task(rt.task(4), &mut comm); for w in &ws4 { assert!(!ws3.contains(w)); } - assert!(core.get_task(2.into()).is_waiting()); - assert!(core.get_task(1.into()).is_waiting()); + assert!(rt.task(2).is_waiting()); + assert!(rt.task(1).is_waiting()); comm.emptiness_check(); - finish_on_worker(&mut core, 3, ws3[0]); - core.sanity_check(); + finish_on_worker(rt.core(), 3, ws3[0]); + rt.sanity_check(); - assert!(core.find_task(3.into()).is_none()); + assert!(!rt.task_exists(3)); for w in ws3 { - assert!(core.get_worker_by_id_or_panic(w).mn_task().is_none()); + assert!(rt.core().get_worker_by_id_or_panic(w).mn_task().is_none()); } - scheduler.run_scheduling(&mut core, &mut comm); - core.sanity_check(); + scheduler.run_scheduling(rt.core(), &mut comm); + rt.sanity_check(); - let task2 = core.get_task(2.into()); - let ws2 = test_mn_task(task2, &mut comm); + let ws2 = test_mn_task(rt.task(2), &mut comm); comm.emptiness_check(); - finish_on_worker(&mut core, 3, ws2[0]); - core.sanity_check(); + finish_on_worker(rt.core(), 3, ws2[0]); + rt.sanity_check(); } #[test] fn test_schedule_mn_reserve() { - let mut core = Core::default(); - create_test_workers(&mut core, &[1, 1, 1]); - - let rmap = core.get_resource_map_mut(); - let task1 = TaskBuilder::new(1).user_priority(10).n_nodes(3).build(rmap); - let task2 = TaskBuilder::new(2).user_priority(5).n_nodes(2).build(rmap); - let task3 = TaskBuilder::new(3).user_priority(0).n_nodes(3).build(rmap); + let mut rt = TestEnv::new(); + rt.new_workers_cpus(&[1, 1, 1]); - submit_test_tasks(&mut core, vec![task1, task2, task3]); - core.sanity_check(); + rt.new_task(1, &TaskBuilder::new().user_priority(10).n_nodes(3)); + rt.new_task(2, &TaskBuilder::new().user_priority(5).n_nodes(2)); + rt.new_task(3, &TaskBuilder::new().user_priority(0).n_nodes(3)); + rt.sanity_check(); let mut scheduler = create_test_scheduler(); let mut comm = create_test_comm(); - scheduler.run_scheduling(&mut core, &mut comm); - core.sanity_check(); + scheduler.run_scheduling(rt.core(), &mut comm); + rt.sanity_check(); - let ws1 = core.get_task(1.into()).mn_placement().unwrap().to_vec(); + let ws1 = rt.task(1).mn_placement().unwrap().to_vec(); assert!(matches!( comm.take_worker_msgs(ws1[0], 1)[0], ToWorkerMessage::ComputeTasks(_) )); - assert!(!core.get_worker_by_id_or_panic(ws1[0]).is_reserved()); - assert!(core.get_worker_by_id_or_panic(ws1[1]).is_reserved()); - assert!(core.get_worker_by_id_or_panic(ws1[2]).is_reserved()); + assert!(!rt.core().get_worker_by_id_or_panic(ws1[0]).is_reserved()); + assert!(rt.core().get_worker_by_id_or_panic(ws1[1]).is_reserved()); + assert!(rt.core().get_worker_by_id_or_panic(ws1[2]).is_reserved()); comm.emptiness_check(); - finish_on_worker(&mut core, 1, ws1[0]); - scheduler.run_scheduling(&mut core, &mut comm); + finish_on_worker(rt.core(), 1, ws1[0]); + scheduler.run_scheduling(rt.core(), &mut comm); - let ws2 = core.get_task(2.into()).mn_placement().unwrap().to_vec(); + let ws2 = rt.task(2).mn_placement().unwrap().to_vec(); for w in &[100, 101, 102] { let s1 = get_worker_status(&ws1, (*w).into()); let s2 = get_worker_status(&ws2, (*w).into()); @@ -151,11 +134,11 @@ fn test_schedule_mn_reserve() { check_worker_status_change(s1, s2, ms.as_slice()); } comm.emptiness_check(); - core.sanity_check(); + rt.sanity_check(); - finish_on_worker(&mut core, 2, ws2[0]); - scheduler.run_scheduling(&mut core, &mut comm); - let ws3 = core.get_task(3.into()).mn_placement().unwrap().to_vec(); + finish_on_worker(rt.core(), 2, ws2[0]); + scheduler.run_scheduling(rt.core(), &mut comm); + let ws3 = rt.task(3).mn_placement().unwrap().to_vec(); for w in &[100, 101, 102] { let s1 = get_worker_status(&ws2, (*w).into()); @@ -164,10 +147,10 @@ fn test_schedule_mn_reserve() { check_worker_status_change(s1, s2, ms.as_slice()); } comm.emptiness_check(); - core.sanity_check(); + rt.sanity_check(); - finish_on_worker(&mut core, 3, ws3[0]); - scheduler.run_scheduling(&mut core, &mut comm); + finish_on_worker(rt.core(), 3, ws3[0]); + scheduler.run_scheduling(rt.core(), &mut comm); for w in &[100, 101, 102] { let s = get_worker_status(&ws3, (*w).into()); @@ -175,62 +158,55 @@ fn test_schedule_mn_reserve() { check_worker_status_change(s, WorkerStatus::None, ms.as_slice()); } comm.emptiness_check(); - core.sanity_check(); + rt.sanity_check(); } #[test] fn test_schedule_mn_fill() { - let mut core = Core::default(); + let mut rt = TestEnv::new(); let mut comm = create_test_comm(); - - create_test_workers( - &mut core, - &[/* 11 workers */ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11], - ); - let rmap = core.get_resource_map_mut(); - let task1 = TaskBuilder::new(1).n_nodes(3).build(rmap); - let task2 = TaskBuilder::new(2).n_nodes(5).build(rmap); - let task3 = TaskBuilder::new(3).n_nodes(1).build(rmap); - let task4 = TaskBuilder::new(4).n_nodes(2).build(rmap); - submit_test_tasks(&mut core, vec![task1, task2, task3, task4]); + rt.new_workers_cpus(&[/* 11 workers */ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]); + rt.new_task(1, &TaskBuilder::new().n_nodes(3)); + rt.new_task(2, &TaskBuilder::new().n_nodes(5)); + rt.new_task(3, &TaskBuilder::new().n_nodes(1)); + rt.new_task(4, &TaskBuilder::new().n_nodes(2)); let mut scheduler = create_test_scheduler(); - scheduler.run_scheduling(&mut core, &mut comm); - core.sanity_check(); - for w in core.get_workers() { + scheduler.run_scheduling(rt.core(), &mut comm); + rt.sanity_check(); + for w in rt.core().get_workers() { assert!(w.mn_task().is_some()); } for t in &[1, 2, 3, 4] { - assert!(core.get_task(TaskId::new_test(*t)).is_mn_running()); + assert!(rt.task(*t).is_mn_running()); } } #[test] fn test_mn_not_enough() { - let mut core = Core::default(); + let mut rt = TestEnv::new(); let mut comm = create_test_comm(); - create_test_workers(&mut core, &[4]); - let rmap = core.get_resource_map_mut(); - let task1 = TaskBuilder::new(1).n_nodes(3).build(rmap); - let task2 = TaskBuilder::new(2).n_nodes(5).build(rmap); - let task3 = TaskBuilder::new(3).n_nodes(11).build(rmap); - let task4 = TaskBuilder::new(4).n_nodes(2).build(rmap); + rt.new_workers_cpus(&[4]); + rt.new_task(1, &TaskBuilder::new().n_nodes(3)); + rt.new_task(2, &TaskBuilder::new().n_nodes(5)); + rt.new_task(3, &TaskBuilder::new().n_nodes(11)); + rt.new_task(4, &TaskBuilder::new().n_nodes(2)); + let rmap = rt.core().resource_map_mut(); let r1 = rmap.get_resource_rq_id(&ResBuilder::default().n_nodes(3).finish_v()); let r2 = rmap.get_resource_rq_id(&ResBuilder::default().n_nodes(5).finish_v()); let r3 = rmap.get_resource_rq_id(&ResBuilder::default().n_nodes(11).finish_v()); let r4 = rmap.get_resource_rq_id(&ResBuilder::default().n_nodes(2).finish_v()); - submit_test_tasks(&mut core, vec![task1, task2, task3, task4]); let mut scheduler = create_test_scheduler(); - scheduler.run_scheduling(&mut core, &mut comm); - core.sanity_check(); - for w in core.get_workers() { + scheduler.run_scheduling(rt.core(), &mut comm); + rt.sanity_check(); + for w in rt.core().get_workers() { assert!(w.mn_task().is_none()); } for t in &[1, 2, 3, 4] { - assert!(core.get_task(TaskId::new_test(*t)).is_waiting()); + assert!(rt.task(*t).is_waiting()); } - let (mn_queue, _, _) = core.multi_node_queue_split(); + let (mn_queue, _, _) = rt.core().multi_node_queue_split(); assert!(mn_queue.is_sleeping(r1)); assert!(mn_queue.is_sleeping(r2)); assert!(mn_queue.is_sleeping(r3)); @@ -239,108 +215,88 @@ fn test_mn_not_enough() { #[test] fn test_mn_sleep_wakeup_one_by_one() { - let mut core = Core::default(); + let mut rt = TestEnv::new(); let mut comm = create_test_comm(); - let rmap = core.get_resource_map_mut(); - let task1 = TaskBuilder::new(1).n_nodes(4).user_priority(10).build(rmap); - submit_test_tasks(&mut core, vec![task1]); - - create_test_workers(&mut core, &[4, 1]); + rt.new_task(1, &TaskBuilder::new().n_nodes(4).user_priority(10)); + rt.new_workers_cpus(&[4, 1]); let mut scheduler = create_test_scheduler(); - scheduler.run_scheduling(&mut core, &mut comm); - core.sanity_check(); - assert!(core.task_map().get_task(1.into()).is_waiting()); - - let rmap = core.get_resource_map_mut(); - let task2 = TaskBuilder::new(2).n_nodes(2).user_priority(1).build(rmap); - submit_test_tasks(&mut core, vec![task2]); - scheduler.run_scheduling(&mut core, &mut comm); - core.sanity_check(); - assert!(core.task_map().get_task(1.into()).is_waiting()); - assert!(core.task_map().get_task(2.into()).is_mn_running()); - - let w = core.task_map().get_task(2.into()).mn_root_worker().unwrap(); - finish_on_worker(&mut core, 2, w); - create_test_worker(&mut core, 500.into(), 1); - create_test_worker(&mut core, 501.into(), 1); - scheduler.run_scheduling(&mut core, &mut comm); - core.sanity_check(); - assert!(core.task_map().get_task(1.into()).is_mn_running()); + scheduler.run_scheduling(rt.core(), &mut comm); + rt.sanity_check(); + assert!(rt.task(1).is_waiting()); + + rt.new_task(2, &TaskBuilder::new().n_nodes(2).user_priority(1)); + scheduler.run_scheduling(rt.core(), &mut comm); + rt.sanity_check(); + assert!(rt.task(1).is_waiting()); + assert!(rt.task(2).is_mn_running()); + + let w = rt.task(2).mn_root_worker().unwrap(); + finish_on_worker(rt.core(), 2, w); + rt.new_worker_with_id(500, &WorkerBuilder::new(1)); + rt.new_worker_with_id(501, &WorkerBuilder::new(1)); + scheduler.run_scheduling(rt.core(), &mut comm); + rt.sanity_check(); + assert!(rt.task_map().get_task(1.into()).is_mn_running()); } #[test] fn test_mn_sleep_wakeup_at_once() { - let mut core = Core::default(); + let mut rt = TestEnv::new(); let mut comm = create_test_comm(); - - create_test_workers(&mut core, &[4, 1]); - let rmap = core.get_resource_map_mut(); - let task1 = TaskBuilder::new(1).n_nodes(4).user_priority(10).build(rmap); - let task2 = TaskBuilder::new(2).n_nodes(2).user_priority(1).build(rmap); - submit_test_tasks(&mut core, vec![task1, task2]); + rt.new_workers_cpus(&[4, 1]); + rt.new_task(1, &TaskBuilder::new().n_nodes(4).user_priority(10)); + rt.new_task(2, &TaskBuilder::new().n_nodes(2).user_priority(1)); let mut scheduler = create_test_scheduler(); - scheduler.run_scheduling(&mut core, &mut comm); - core.sanity_check(); - assert!(core.task_map().get_task(1.into()).is_waiting()); - assert!(core.task_map().get_task(2.into()).is_mn_running()); + scheduler.run_scheduling(rt.core(), &mut comm); + rt.sanity_check(); + assert!(rt.task(1).is_waiting()); + assert!(rt.task(2).is_mn_running()); } #[test] fn test_mn_schedule_on_groups() { - let mut core = Core::default(); - - let resource_map = ResourceIdMap::from_vec(vec!["cpus".to_string()]); - let worker_id = WorkerId::new(100); - let mut wcfg1 = create_test_worker_config(worker_id, ResourceDescriptor::simple_cpus(1)); - wcfg1.group = "group1".to_string(); - new_test_worker(&mut core, worker_id, wcfg1, &resource_map); + let mut rt = TestEnv::new(); - let worker_id = WorkerId::new(101); - let mut wcfg2 = create_test_worker_config(worker_id, ResourceDescriptor::simple_cpus(1)); - wcfg2.group = "group2".to_string(); - new_test_worker(&mut core, worker_id, wcfg2, &resource_map); + rt.new_worker(&WorkerBuilder::new(1).group("group1")); + rt.new_worker(&WorkerBuilder::new(1).group("group2")); let mut comm = create_test_comm(); - let rmap = core.get_resource_map_mut(); - let task1 = TaskBuilder::new(1).n_nodes(2).build(rmap); - submit_test_tasks(&mut core, vec![task1]); + rt.new_task(1, &TaskBuilder::new().n_nodes(2)); let mut scheduler = create_test_scheduler(); - scheduler.run_scheduling(&mut core, &mut comm); - core.sanity_check(); - assert!(core.task_map().get_task(1.into()).is_waiting()); + scheduler.run_scheduling(rt.core(), &mut comm); + rt.sanity_check(); + assert!(rt.task(1).is_waiting()); } #[test] fn test_schedule_mn_time_request1() { let mut rt = TestEnv::new(); - rt.new_workers_ext(&[ - (1, None, Vec::new()), - (1, Some(Duration::new(29_999, 0)), Vec::new()), - (1, Some(Duration::new(30_001, 0)), Vec::new()), - ]); - rt.new_task(TaskBuilder::new(1).n_nodes(3).time_request(30_000)); + + rt.new_worker(&WorkerBuilder::new(1)); + rt.new_worker(&WorkerBuilder::new(1).time_limit(Duration::new(29_999, 0))); + rt.new_worker(&WorkerBuilder::new(1).time_limit(Duration::new(30_001, 0))); + + rt.new_task(1, &TaskBuilder::new().n_nodes(3).time_request(30_000)); rt.schedule(); - assert!(rt.task(1.into()).is_waiting()); + assert!(rt.task(1).is_waiting()); - rt.new_task(TaskBuilder::new(2).n_nodes(2).time_request(30_000)); + rt.new_task(2, &TaskBuilder::new().n_nodes(2).time_request(30_000)); rt.schedule(); - assert!(rt.task(1.into()).is_waiting()); - assert!(rt.task(2.into()).is_mn_running()); + assert!(rt.task(1).is_waiting()); + assert!(rt.task(2).is_mn_running()); } #[test] fn test_schedule_mn_time_request2() { let mut rt = TestEnv::new(); - rt.new_workers_ext(&[ - (1, Some(Duration::new(59_999, 0)), Vec::new()), - (1, Some(Duration::new(29_999, 0)), Vec::new()), - (1, Some(Duration::new(30_001, 0)), Vec::new()), - ]); - rt.new_task(TaskBuilder::new(1).n_nodes(3).time_request(23_998)); + rt.new_worker(&WorkerBuilder::new(1).time_limit(Duration::new(59_999, 0))); + rt.new_worker(&WorkerBuilder::new(1).time_limit(Duration::new(29_999, 0))); + rt.new_worker(&WorkerBuilder::new(1).time_limit(Duration::new(30_001, 0))); + rt.new_task(1, &TaskBuilder::new().n_nodes(3).time_request(23_998)); rt.schedule(); - assert!(rt.task(1.into()).is_mn_running()); + assert!(rt.task(1).is_mn_running()); } diff --git a/crates/tako/src/internal/tests/test_scheduler_sn.rs b/crates/tako/src/internal/tests/test_scheduler_sn.rs index 2fd3e47b2..c1c4bd581 100644 --- a/crates/tako/src/internal/tests/test_scheduler_sn.rs +++ b/crates/tako/src/internal/tests/test_scheduler_sn.rs @@ -4,16 +4,14 @@ use crate::internal::messages::worker::{ }; use crate::internal::server::core::Core; use crate::internal::server::reactor::on_steal_response; -use crate::internal::server::task::Task; use crate::internal::tests::utils::env::{TestEnv, create_test_comm}; use crate::internal::tests::utils::schedule::{ - create_test_scheduler, create_test_worker, create_test_workers, finish_on_worker, - start_and_finish_on_worker_with_data, submit_test_tasks, + create_test_scheduler, finish_on_worker, start_and_finish_on_worker_with_data, }; use crate::internal::tests::utils::task::TaskBuilder; -use crate::internal::tests::utils::task::task; use crate::internal::tests::utils::workflows::submit_example_4; -use crate::resources::{ResourceAmount, ResourceDescriptorItem, ResourceUnits}; +use crate::resources::{ResourceAmount, ResourceUnits}; +use crate::tests::utils::worker::WorkerBuilder; use crate::{JobId, TaskId}; use std::time::Duration; @@ -26,22 +24,22 @@ fn task_count(msg: &ToWorkerMessage) -> usize { #[test] fn test_no_deps_scattering_1() { - let mut core = Core::default(); - create_test_workers(&mut core, &[5, 5, 5]); + let mut rt = TestEnv::new(); + rt.new_workers_cpus(&[5, 5, 5]); - let rmap = core.get_resource_map_mut(); - let tasks: Vec = (1..=4).map(|id| task(id, rmap)).collect(); - submit_test_tasks(&mut core, tasks); + (1..=4).for_each(|id| { + rt.new_task_default(id); + }); let mut scheduler = create_test_scheduler(); let mut comm = create_test_comm(); - scheduler.run_scheduling_without_balancing(&mut core, &mut comm); + scheduler.run_scheduling_without_balancing(rt.core(), &mut comm); let m1 = comm.take_worker_msgs(100, 0); let m2 = comm.take_worker_msgs(101, 0); let m3 = comm.take_worker_msgs(102, 0); comm.emptiness_check(); - core.sanity_check(); + rt.core().sanity_check(); let c1 = if m1.len() > 0 { task_count(&m1[0]) } else { 0 }; let c2 = if m2.len() > 0 { task_count(&m2[0]) } else { 0 }; @@ -53,17 +51,19 @@ fn test_no_deps_scattering_1() { #[test] fn test_no_deps_scattering_2() { - let mut core = Core::default(); - create_test_workers(&mut core, &[5, 5, 5]); + let mut rt = TestEnv::new(); + rt.new_workers_cpus(&[5, 5, 5]); let mut scheduler = create_test_scheduler(); let mut comm = create_test_comm(); let mut submit_and_check = |id, expected| { - let rmap = core.get_resource_map_mut(); - let t = task(id, rmap); - submit_test_tasks(&mut core, vec![t]); - scheduler.run_scheduling_without_balancing(&mut core, &mut comm); - let mut counts: Vec<_> = core.get_workers().map(|w| w.sn_tasks().len()).collect(); + let _t = rt.new_task_default(id); + scheduler.run_scheduling_without_balancing(rt.core(), &mut comm); + let mut counts: Vec<_> = rt + .core() + .get_workers() + .map(|w| w.sn_tasks().len()) + .collect(); counts.sort(); assert_eq!(counts, expected); }; @@ -89,22 +89,22 @@ fn test_no_deps_scattering_2() { #[test] fn test_no_deps_distribute_without_balance() { - let mut core = Core::default(); - create_test_workers(&mut core, &[10, 10, 10]); + let mut rt = TestEnv::new(); + rt.new_workers_cpus(&[10, 10, 10]); - let rmap = core.get_resource_map_mut(); - let tasks: Vec = (1..=150).map(|id| task(id, rmap)).collect(); - submit_test_tasks(&mut core, tasks); + (1..=150).for_each(|id| { + rt.new_task_default(id); + }); let mut scheduler = create_test_scheduler(); let mut comm = create_test_comm(); - let need_balance = scheduler.run_scheduling_without_balancing(&mut core, &mut comm); + let need_balance = scheduler.run_scheduling_without_balancing(rt.core(), &mut comm); let m1 = comm.take_worker_msgs(100, 1); let m2 = comm.take_worker_msgs(101, 1); let m3 = comm.take_worker_msgs(102, 1); comm.emptiness_check(); - core.sanity_check(); + rt.sanity_check(); assert_eq!(task_count(&m1[0]), 50); assert_eq!(task_count(&m2[0]), 50); @@ -114,35 +114,31 @@ fn test_no_deps_distribute_without_balance() { #[test] fn test_no_deps_distribute_with_balance() { - //setup_logging(); - let mut core = Core::default(); - create_test_workers(&mut core, &[2, 2, 2]); + let mut rt = TestEnv::new(); + rt.new_workers_cpus(&[2, 2, 2]); - assert_eq!(core.get_worker_map().len(), 3); - for w in core.get_workers() { + assert_eq!(rt.core().get_worker_map().len(), 3); + for w in rt.core().get_workers() { assert!(w.is_underloaded()); } - let mut active_ids: Set = (1..301).map(|id| id.into()).collect(); - let rmap = core.get_resource_map_mut(); - let tasks: Vec = (1..301).map(|id| task(id, rmap)).collect(); - submit_test_tasks(&mut core, tasks); + let mut active_ids: Set = (1..301).map(|id| rt.new_task_default(id)).collect(); let mut scheduler = create_test_scheduler(); let mut comm = create_test_comm(); - scheduler.run_scheduling(&mut core, &mut comm); + scheduler.run_scheduling(rt.core(), &mut comm); let m1 = comm.take_worker_msgs(100, 1); let m2 = comm.take_worker_msgs(101, 1); let m3 = comm.take_worker_msgs(102, 1); comm.emptiness_check(); - core.sanity_check(); + rt.sanity_check(); assert!(matches!(&m1[0], ToWorkerMessage::ComputeTasks(msg) if msg.tasks.len() >= 29)); assert!(matches!(&m2[0], ToWorkerMessage::ComputeTasks(msg) if msg.tasks.len() >= 29)); assert!(matches!(&m3[0], ToWorkerMessage::ComputeTasks(msg) if msg.tasks.len() >= 29)); - for w in core.get_workers() { + for w in rt.core().get_workers() { assert!(!w.is_underloaded()); } @@ -160,15 +156,15 @@ fn test_no_deps_distribute_with_balance() { } }; - finish_all(&mut core, m1, 100); - finish_all(&mut core, m3, 102); + finish_all(rt.core(), m1, 100); + finish_all(rt.core(), m3, 102); - core.assert_underloaded(&[100, 102]); - core.assert_not_underloaded(&[101]); + rt.core().assert_underloaded(&[100, 102]); + rt.core().assert_not_underloaded(&[101]); - scheduler.run_scheduling(&mut core, &mut comm); + scheduler.run_scheduling(rt.core(), &mut comm); - core.assert_not_underloaded(&[100, 101, 102]); + rt.core().assert_not_underloaded(&[100, 101, 102]); // TODO: Finish stealing @@ -182,10 +178,10 @@ fn test_no_deps_distribute_with_balance() { }; comm.emptiness_check(); - core.sanity_check(); + rt.sanity_check(); on_steal_response( - &mut core, + rt.core(), &mut comm, 101.into(), StealResponseMsg { @@ -200,26 +196,29 @@ fn test_no_deps_distribute_with_balance() { assert!(n3.len() > 5); assert_eq!(n1.len() + n3.len(), stealing.len()); - core.assert_not_underloaded(&[100, 101, 102]); + rt.core().assert_not_underloaded(&[100, 101, 102]); comm.emptiness_check(); - core.sanity_check(); + rt.sanity_check(); - finish_all(&mut core, n1, 100); - finish_all(&mut core, n3, 102); + finish_all(rt.core(), n1, 100); + finish_all(rt.core(), n3, 102); assert_eq!( active_ids.len(), - core.get_worker_by_id_or_panic(101.into()).sn_tasks().len() + rt.core() + .get_worker_by_id_or_panic(101.into()) + .sn_tasks() + .len() ); comm.emptiness_check(); - core.sanity_check(); + rt.sanity_check(); } #[test] fn test_resource_balancing1() { let mut rt = TestEnv::new(); - rt.new_workers(&[4, 6]); + rt.new_workers_cpus(&[4, 6]); // 10 - 3, 11 - 4, 12 - 2 rt.new_assigned_tasks_cpus(&[&[3, 4, 2]]); @@ -228,7 +227,7 @@ fn test_resource_balancing1() { rt.check_worker_tasks(101, &[11, 12]); let mut rt = TestEnv::new(); - rt.new_workers(&[6, 3]); + rt.new_workers_cpus(&[6, 3]); rt.new_assigned_tasks_cpus(&[&[3, 4, 2]]); rt.balance(); rt.check_worker_tasks(100, &[11, 12]); @@ -239,7 +238,7 @@ fn test_resource_balancing1() { fn test_resource_balancing2() { let u = ResourceAmount::new_units; let mut rt = TestEnv::new(); - rt.new_workers(&[12, 12, 12]); + rt.new_workers_cpus(&[12, 12, 12]); rt.new_assigned_tasks_cpus(&[&[ 4, 4, 4, /* 12 */ 4, 4, 4, /* 12 */ 4, 4, 4, /* 12 */ @@ -254,7 +253,7 @@ fn test_resource_balancing2() { fn test_resource_balancing3() { let u = ResourceAmount::new_units; let mut rt = TestEnv::new(); - rt.new_workers(&[12, 12, 12]); + rt.new_workers_cpus(&[12, 12, 12]); rt.new_assigned_tasks_cpus(&[ &[ @@ -273,7 +272,7 @@ fn test_resource_balancing3() { fn test_resource_balancing4() { let u = ResourceAmount::new_units; let mut rt = TestEnv::new(); - rt.new_workers(&[12, 12, 12]); + rt.new_workers_cpus(&[12, 12, 12]); rt.new_assigned_tasks_cpus(&[&[ 2, 4, 2, 4, /* 12 */ 4, 4, 4, /* 12 */ 2, 2, 2, 2, 4, /* 12 */ @@ -288,7 +287,7 @@ fn test_resource_balancing4() { fn test_resource_balancing5() { let u = ResourceAmount::new_units; let mut rt = TestEnv::new(); - rt.new_workers(&[12, 12, 12]); + rt.new_workers_cpus(&[12, 12, 12]); rt.new_assigned_tasks_cpus(&[&[ 2, 4, 2, 4, 2, /* 14 */ 4, 4, 4, /* 12 */ 4, 4, 4, /* 12 */ @@ -302,7 +301,7 @@ fn test_resource_balancing6() { let _ = env_logger::builder().is_test(true).try_init(); let u = ResourceAmount::new_units; let mut rt = TestEnv::new(); - rt.new_workers(&[12, 12, 12, 12, 12]); + rt.new_workers_cpus(&[12, 12, 12, 12, 12]); rt.new_assigned_tasks_cpus(&[ &[2, 2, 4], &[4, 4, 4, 4], @@ -319,20 +318,20 @@ fn test_resource_balancing7() { let u = ResourceAmount::new_units; let mut rt = TestEnv::new(); - rt.new_workers(&[12, 12, 12]); + rt.new_workers_cpus(&[12, 12, 12]); - rt.new_task_running(TaskBuilder::new(10).cpus_compact(2), 100); - rt.new_task_running(TaskBuilder::new(11).cpus_compact(2), 100); - rt.new_task_running(TaskBuilder::new(12).cpus_compact(4), 100); + rt.new_task_running(10, &TaskBuilder::new().cpus(2), 100); + rt.new_task_running(11, &TaskBuilder::new().cpus(2), 100); + rt.new_task_running(12, &TaskBuilder::new().cpus(4), 100); - rt.new_task_running(TaskBuilder::new(20).cpus_compact(4), 101); - rt.new_task_assigned(TaskBuilder::new(21).cpus_compact(4), 101); // <- only assigned - rt.new_task_running(TaskBuilder::new(22).cpus_compact(4), 101); - rt.new_task_running(TaskBuilder::new(23).cpus_compact(4), 101); + rt.new_task_running(20, &TaskBuilder::new().cpus(4), 101); + rt.new_task_assigned(21, &TaskBuilder::new().cpus(4), 101); // <- only assigned + rt.new_task_running(22, &TaskBuilder::new().cpus(4), 101); + rt.new_task_running(23, &TaskBuilder::new().cpus(4), 101); - rt.new_task_running(TaskBuilder::new(30).cpus_compact(4), 102); - rt.new_task_running(TaskBuilder::new(31).cpus_compact(4), 102); - rt.new_task_running(TaskBuilder::new(33).cpus_compact(4), 102); + rt.new_task_running(30, &TaskBuilder::new().cpus(4), 102); + rt.new_task_running(31, &TaskBuilder::new().cpus(4), 102); + rt.new_task_running(33, &TaskBuilder::new().cpus(4), 102); rt.schedule(); rt.check_worker_load_lower_bounds(&[u(12), u(12), u(12)]); @@ -342,7 +341,7 @@ fn test_resource_balancing7() { fn test_resources_blocked_workers() { let u = ResourceAmount::new_units; let mut rt = TestEnv::new(); - rt.new_workers(&[4, 8, 2]); + rt.new_workers_cpus(&[4, 8, 2]); rt.new_assigned_tasks_cpus(&[&[4, 4, 4, 4, 4]]); rt.balance(); @@ -355,14 +354,14 @@ fn test_resources_blocked_workers() { assert!(rt.worker(102).is_parked()); rt.core().sanity_check(); - rt.new_ready_tasks_cpus(&[3]); + rt.new_tasks_cpus(&[3]); rt.schedule(); assert!(!rt.worker(100).is_parked()); assert!(!rt.worker(101).is_parked()); assert!(rt.worker(102).is_parked()); - rt.new_ready_tasks_cpus(&[1]); + rt.new_tasks_cpus(&[1]); rt.schedule(); assert!(!rt.worker(100).is_parked()); @@ -380,9 +379,9 @@ fn test_resources_blocked_workers() { fn test_resources_no_workers1() { let u = ResourceAmount::new_units; let mut rt = TestEnv::new(); - rt.new_workers(&[4, 8, 2]); + rt.new_workers_cpus(&[4, 8, 2]); - rt.new_ready_tasks_cpus(&[8, 8, 16, 24]); + rt.new_tasks_cpus(&[8, 8, 16, 24]); rt.schedule(); assert_eq!(rt.worker_load(100).get(0.into()), u(0)); assert_eq!(rt.worker_load(101).get(0.into()), u(16)); @@ -405,15 +404,15 @@ fn test_resources_no_workers2() { .unwrap() as u32 + rt.task_id_counter; - rt.new_workers(&[8, 8, 8]); + rt.new_workers_cpus(&[8, 8, 8]); - rt.new_ready_tasks_cpus(task_cpu_counts); + rt.new_tasks_cpus(task_cpu_counts); rt.schedule(); assert!(rt.worker_load(100).get(0.into()).is_zero()); assert!(rt.worker_load(101).get(0.into()).is_zero()); assert!(rt.worker_load(102).get(0.into()).is_zero()); - rt.new_workers(&[9, 10]); + rt.new_workers_cpus(&[9, 10]); rt.schedule(); assert!(rt.worker_load(100).get(0.into()).is_zero()); assert!(rt.worker_load(101).get(0.into()).is_zero()); @@ -438,11 +437,11 @@ fn test_resources_no_workers2() { fn test_resource_time_assign() { let mut rt = TestEnv::new(); - rt.new_workers_ext(&[(1, Some(Duration::new(100, 0)), Vec::new())]); + rt.new_worker(&WorkerBuilder::new(1).time_limit(Duration::new(100, 0))); - rt.new_task(TaskBuilder::new(10).time_request(170)); - rt.new_task(TaskBuilder::new(11)); - rt.new_task(TaskBuilder::new(12).time_request(99)); + rt.new_task(10, &TaskBuilder::new().time_request(170)); + rt.new_task_default(11); + rt.new_task(12, &TaskBuilder::new().time_request(99)); rt.schedule(); rt.finish_scheduling(); @@ -454,15 +453,13 @@ fn test_resource_time_balance1() { let _ = env_logger::builder().is_test(true).try_init(); let mut rt = TestEnv::new(); - rt.new_workers_ext(&[ - (1, Some(Duration::new(50, 0)), Vec::new()), - (1, Some(Duration::new(200, 0)), Vec::new()), - (1, Some(Duration::new(100, 0)), Vec::new()), - ]); + rt.new_worker(&WorkerBuilder::new(1).time_limit(Duration::new(50, 0))); + rt.new_worker(&WorkerBuilder::new(1).time_limit(Duration::new(200, 0))); + rt.new_worker(&WorkerBuilder::new(1).time_limit(Duration::new(100, 0))); - rt.new_task(TaskBuilder::new(10).time_request(170)); - rt.new_task(TaskBuilder::new(11)); - rt.new_task(TaskBuilder::new(12).time_request(99)); + rt.new_task(10, &TaskBuilder::new().time_request(170)); + rt.new_task_default(11); + rt.new_task(12, &TaskBuilder::new().time_request(99)); rt.test_assign(10, 101); rt.test_assign(11, 101); @@ -477,29 +474,25 @@ fn test_resource_time_balance1() { #[test] fn test_generic_resource_assign2() { - //let _ = env_logger::init(); let mut rt = TestEnv::new(); rt.new_generic_resource(2); - rt.new_workers_ext(&[ - // Worker 100 - (10, None, vec![ResourceDescriptorItem::range("Res0", 1, 10)]), - // Worker 101 - (10, None, vec![]), - // Worker 102 - ( - 10, - None, - vec![ - ResourceDescriptorItem::range("Res0", 1, 10), - ResourceDescriptorItem::sum("Res1", 1_000_000), - ], - ), - ]); + + // Worker 100 + rt.new_worker(&WorkerBuilder::new(10).res_range("Res0", 1, 10)); + // Worker 101 + rt.new_worker(&WorkerBuilder::new(10)); + // Worker 102 + rt.new_worker( + &WorkerBuilder::new(10) + .res_range("Res0", 1, 10) + .res_sum("Res1", 1_000_000), + ); + for i in 0..50 { - rt.new_task(TaskBuilder::new(i).add_resource(1, 1)); + rt.new_task(i, &TaskBuilder::new().add_resource(1, 1)); } for i in 50..100 { - rt.new_task(TaskBuilder::new(i).add_resource(2, 2)); + rt.new_task(i, &TaskBuilder::new().add_resource(2, 2)); } rt.schedule(); @@ -558,26 +551,22 @@ fn test_generic_resource_assign2() { fn test_generic_resource_balance1() { let mut rt = TestEnv::new(); rt.new_generic_resource(2); - rt.new_workers_ext(&[ - // Worker 100 - (10, None, vec![ResourceDescriptorItem::range("Res0", 1, 10)]), - // Worker 101 - (10, None, vec![]), - // Worker 102 - ( - 10, - None, - vec![ - ResourceDescriptorItem::range("Res0", 1, 10), - ResourceDescriptorItem::sum("Res1", 1_000_000), - ], - ), - ]); + // Worker 100 + rt.new_worker(&WorkerBuilder::new(10).res_range("Res0", 1, 10)); + // Worker 101 + rt.new_worker(&WorkerBuilder::new(10)); + // Worker 102 + rt.new_worker( + &WorkerBuilder::new(10) + .res_range("Res0", 1, 10) + .res_sum("Res1", 1_000_000), + ); - rt.new_task(TaskBuilder::new(1).cpus_compact(1).add_resource(1, 5)); - rt.new_task(TaskBuilder::new(2).cpus_compact(1).add_resource(1, 5)); - rt.new_task(TaskBuilder::new(3).cpus_compact(1).add_resource(1, 5)); - rt.new_task(TaskBuilder::new(4).cpus_compact(1).add_resource(1, 5)); + let t = TaskBuilder::new().cpus(1).add_resource(1, 5); + rt.new_task(1, &t); + rt.new_task(2, &t); + rt.new_task(3, &t); + rt.new_task(4, &t); rt.schedule(); assert_eq!( @@ -607,33 +596,30 @@ fn test_generic_resource_balance1() { fn test_generic_resource_balance2() { let mut rt = TestEnv::new(); rt.new_generic_resource(2); - rt.new_workers_ext(&[ - // Worker 100 - (10, None, vec![ResourceDescriptorItem::range("Res0", 1, 10)]), - // Worker 101 - (10, None, vec![]), - // Worker 102 - ( - 10, - None, - vec![ - ResourceDescriptorItem::range("Res0", 1, 10), - ResourceDescriptorItem::sum("Res1", 1_000_000), - ], - ), - ]); + // Worker 100 + rt.new_worker(&WorkerBuilder::new(10).res_range("Res0", 1, 10)); + // Worker 101 + rt.new_worker(&WorkerBuilder::new(10)); + // Worker 102 + rt.new_worker( + &WorkerBuilder::new(10) + .res_range("Res0", 1, 10) + .res_sum("Res1", 1_000_000), + ); - rt.new_task(TaskBuilder::new(1).cpus_compact(1).add_resource(1, 5)); + rt.new_task(1, &TaskBuilder::new().cpus(1).add_resource(1, 5)); rt.new_task( - TaskBuilder::new(2) - .cpus_compact(1) + 2, + &TaskBuilder::new() + .cpus(1) .add_resource(1, 5) .add_resource(2, 500_000), ); - rt.new_task(TaskBuilder::new(3).cpus_compact(1).add_resource(1, 5)); + rt.new_task(3, &TaskBuilder::new().cpus(1).add_resource(1, 5)); rt.new_task( - TaskBuilder::new(4) - .cpus_compact(1) + 4, + &TaskBuilder::new() + .cpus(1) .add_resource(1, 5) .add_resource(2, 500_000), ); @@ -666,18 +652,18 @@ fn test_generic_resource_balance2() { fn test_generic_resource_balancing3() { let mut rt = TestEnv::new(); rt.new_generic_resource(1); - rt.new_workers_ext(&[ - // Worker 100 - (2, None, vec![]), - // Worker 101 - (2, None, vec![ResourceDescriptorItem::range("Res0", 1, 1)]), - ]); + // Worker 100 + rt.new_worker(&WorkerBuilder::new(2)); + // Worker 101 + rt.new_worker(&WorkerBuilder::new(2).res_range("Res0", 1, 1)); for i in 1..=80 { - rt.new_task(TaskBuilder::new(i).cpus_compact(1)); + rt.new_task_default(i); } + + let t = TaskBuilder::new().cpus(1).add_resource(1, 1); for i in 81..=100 { - rt.new_task(TaskBuilder::new(i).cpus_compact(1).add_resource(1, 1)); + rt.new_task(i, &t); } rt.schedule(); @@ -702,20 +688,18 @@ fn test_generic_resource_balancing3() { fn test_generic_resource_variants1() { let mut rt = TestEnv::new(); rt.new_generic_resource(1); - rt.new_workers_ext(&[ - // Worker 100 - (4, None, vec![]), - // Worker 101 - (4, None, vec![ResourceDescriptorItem::range("Res0", 1, 2)]), - ]); - + // Worker 100 + rt.new_worker(&WorkerBuilder::new(4)); + // Worker 101 + rt.new_worker(&WorkerBuilder::new(4).res_range("Res0", 1, 2)); + + let task = TaskBuilder::new() + .cpus(2) + .next_resources() + .cpus(1) + .add_resource(1, 1); for i in 1..=4 { - let task = TaskBuilder::new(i) - .cpus_compact(2) - .next_resources() - .cpus_compact(1) - .add_resource(1, 1); - rt.new_task(task); + rt.new_task(i, &task); } rt.schedule(); @@ -739,20 +723,18 @@ fn test_generic_resource_variants1() { fn test_generic_resource_variants2() { let mut rt = TestEnv::new(); rt.new_generic_resource(1); - rt.new_workers_ext(&[ - // Worker 100 - (4, None, vec![]), - // Worker 101 - (4, None, vec![ResourceDescriptorItem::range("Res0", 1, 2)]), - ]); - + // Worker 100 + rt.new_worker(&WorkerBuilder::new(4)); + // Worker 101 + rt.new_worker(&WorkerBuilder::new(4).res_range("Res0", 1, 2)); + + let task = TaskBuilder::new() + .cpus(8) + .next_resources() + .cpus(2) + .add_resource(1, 1); for i in 1..=4 { - let task = TaskBuilder::new(i) - .cpus_compact(8) - .next_resources() - .cpus_compact(2) - .add_resource(1, 1); - rt.new_task(task); + rt.new_task(i, &task); } rt.schedule(); @@ -796,11 +778,11 @@ fn test_task_data_deps_initial_placing() { } for (worker1, worker2, size1, size2a, size2b, target_worker) in &test_data2 { - let mut core = Core::default(); - submit_example_4(&mut core); - create_test_workers(&mut core, &[1, 1]); + let mut rt = TestEnv::new(); + submit_example_4(&mut rt); + rt.new_workers_cpus(&[1, 1, 1]); start_and_finish_on_worker_with_data( - &mut core, + rt.core(), 1, *worker1, vec![TaskOutput { @@ -809,7 +791,7 @@ fn test_task_data_deps_initial_placing() { }], ); start_and_finish_on_worker_with_data( - &mut core, + rt.core(), 2, *worker2, vec![ @@ -823,16 +805,15 @@ fn test_task_data_deps_initial_placing() { }, ], ); - core.assert_ready(&[3]); + rt.core().assert_ready(&[3]); let mut comm = create_test_comm(); let mut scheduler = create_test_scheduler(); - scheduler.run_scheduling(&mut core, &mut comm); + scheduler.run_scheduling(rt.core(), &mut comm); assert_eq!( - core.get_task(3.into()).get_assigned_worker(), + rt.task(3).get_assigned_worker(), Some((*target_worker).into()) ); - //comm.emptiness_check(); - core.sanity_check(); + rt.sanity_check(); } } @@ -841,29 +822,27 @@ fn test_task_data_deps_balancing() { let _ = env_logger::builder().is_test(true).try_init(); for odd in [0u32, 1u32] { for late_worker in [true, false] { - let mut core = Core::default(); - let rmap = core.get_resource_map_mut(); - let t1 = TaskBuilder::new(1).build(rmap); - let t2 = TaskBuilder::new(2).build(rmap); + let mut rt = TestEnv::new(); + let t1 = rt.new_task_default(1); + let t2 = rt.new_task_default(2); let mut ts: Vec<_> = (10u32..110u32) .map(|i| { - TaskBuilder::new(TaskId::new_test(i)) - .data_dep(&t1, i - 10) - .data_dep(&t2, i - 10) - .build(rmap) + rt.new_task( + i, + &TaskBuilder::new().data_dep(t1, i - 10).data_dep(t2, i - 10), + ) }) .collect(); ts.insert(0, t1); ts.insert(0, t2); - submit_test_tasks(&mut core, ts); if late_worker { - create_test_workers(&mut core, &[1]); + rt.new_workers_cpus(&[1]); } else { - create_test_workers(&mut core, &[1, 1]); + rt.new_workers_cpus(&[1, 1]); } let mut set_data = |task_id: u32, worker_id: u32| { start_and_finish_on_worker_with_data( - &mut core, + rt.core(), task_id, worker_id, (0u32..100u32) @@ -878,16 +857,16 @@ fn test_task_data_deps_balancing() { set_data(2, 100); let ids: Vec<_> = (10..110).collect(); - core.assert_ready(&ids); + rt.core().assert_ready(&ids); if late_worker { - create_test_worker(&mut core, 101.into(), 1); + rt.new_worker_with_id(101, &WorkerBuilder::new(1)); } let mut comm = create_test_comm(); let mut scheduler = create_test_scheduler(); - scheduler.run_scheduling(&mut core, &mut comm); + scheduler.run_scheduling(rt.core(), &mut comm); - let worker = &core.get_worker_by_id(101.into()).unwrap(); - let n1_count = worker + let n1_count = rt + .worker(101) .sn_tasks() .iter() .map(|task_id| { @@ -907,24 +886,27 @@ fn test_task_data_deps_balancing() { fn test_resource_priority_balancing() { let mut rt = TestEnv::new(); // 0 1 2 3 4 5 6 7 8 9 10 - rt.new_workers(&[4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 8]); + rt.new_workers_cpus(&[4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 8]); let job_id = JobId::new(7); for i in 0..=9 { rt.new_task_running( - TaskBuilder::new(TaskId::new(job_id, (100 * i).into())).cpus_compact(4), + TaskId::new(job_id, (100 * i).into()), + &TaskBuilder::new().cpus(4), 100 + i, ); for j in 1..(4 + i) { rt.new_task_assigned( - TaskBuilder::new(TaskId::new(job_id, (100 * i + j).into())).cpus_compact(4), + TaskId::new(job_id, (100 * i + j).into()), + &TaskBuilder::new().cpus(4), 100 + i, ); } } for i in 1..=3 { rt.new_task_assigned( - TaskBuilder::new(TaskId::new(JobId::new(1), i.into())).cpus_compact(4), + TaskId::new(JobId::new(1), i.into()), + &TaskBuilder::new().cpus(4), 100, ); } diff --git a/crates/tako/src/internal/tests/utils/env.rs b/crates/tako/src/internal/tests/utils/env.rs index 391e5c658..518861a9c 100644 --- a/crates/tako/src/internal/tests/utils/env.rs +++ b/crates/tako/src/internal/tests/utils/env.rs @@ -2,7 +2,7 @@ use crate::events::EventProcessor; use crate::gateway::LostWorkerReason; use crate::internal::common::Map; use crate::internal::common::index::ItemId; -use crate::internal::common::resources::ResourceDescriptor; +use crate::internal::common::resources::ResourceId; use crate::internal::common::utils::format_comma_delimited; use crate::internal::messages::common::TaskFailInfo; use crate::internal::messages::worker::{ToWorkerMessage, WorkerOverview}; @@ -11,24 +11,19 @@ use crate::internal::server::comm::Comm; use crate::internal::server::core::Core; use crate::internal::server::reactor::on_new_worker; use crate::internal::server::task::Task; +use crate::internal::server::taskmap::TaskMap; use crate::internal::server::worker::Worker; use crate::internal::server::workerload::WorkerLoad; use crate::internal::tests::utils; -use crate::internal::tests::utils::resources::cpus_compact; use crate::internal::tests::utils::schedule; use crate::internal::tests::utils::task::TaskBuilder; use crate::internal::transfer::auth::{deserialize, serialize}; -use crate::internal::worker::configuration::{ - DEFAULT_MAX_DOWNLOAD_TRIES, DEFAULT_MAX_PARALLEL_DOWNLOADS, - DEFAULT_WAIT_BETWEEN_DOWNLOAD_TRIES, OverviewConfiguration, -}; -use crate::resources::{ - ResourceAmount, ResourceDescriptorItem, ResourceDescriptorKind, ResourceUnits, -}; +use crate::resources::{ResourceAmount, ResourceUnits}; use crate::task::SerializedTaskContext; -use crate::worker::{ServerLostPolicy, WorkerConfiguration}; +use crate::tests::utils::worker::WorkerBuilder; +use crate::worker::WorkerConfiguration; use crate::{InstanceId, ResourceVariantId, TaskId, WorkerId}; -use std::time::{Duration, Instant}; +use std::time::Instant; pub struct TestEnv { core: Core, @@ -57,15 +52,35 @@ impl TestEnv { &mut self.core } - pub fn task(&self, task_id: TaskId) -> &Task { - self.core.get_task(task_id) + pub fn task>(&self, task_id: T) -> &Task { + self.core.get_task(task_id.into()) } - pub fn new_task(&mut self, builder: TaskBuilder) -> &Task { - let task = builder.build(self.core.get_resource_map_mut()); - let task_id = task.id; + pub fn task_exists>(&self, task_id: T) -> bool { + self.core.find_task(task_id.into()).is_some() + } + + pub(crate) fn task_map(&self) -> &TaskMap { + self.core.task_map() + } + + pub(crate) fn sanity_check(&self) { + self.core.sanity_check(); + } + + pub fn new_task>(&mut self, task_id: T, builder: &TaskBuilder) -> TaskId { + let task_id = task_id.into(); + let task = builder.build(task_id, self.core.resource_map_mut()); schedule::submit_test_tasks(&mut self.core, vec![task]); - self.task(task_id) + task_id + } + + pub fn new_task_cpus>(&mut self, task_id: T, cpus: u32) -> TaskId { + self.new_task(task_id, &TaskBuilder::new().cpus(cpus)) + } + + pub fn new_task_default>(&mut self, task_id: T) -> TaskId { + self.new_task(task_id, &TaskBuilder::new()) } pub fn new_generic_resource(&mut self, count: usize) { @@ -74,92 +89,76 @@ impl TestEnv { } } - pub fn new_task_assigned>(&mut self, builder: TaskBuilder, worker_id: W) { - let task = builder.build(self.core.get_resource_map_mut()); - let task_id = task.id(); - schedule::submit_test_tasks(&mut self.core, vec![task]); + pub fn new_named_resource(&mut self, name: &str) -> ResourceId { + self.core.get_or_create_resource_id(name) + } + + pub fn new_task_assigned, T: Into>( + &mut self, + task_id: T, + builder: &TaskBuilder, + worker_id: W, + ) { + let task_id = self.new_task(task_id, builder); schedule::assign_to_worker(&mut self.core, task_id, worker_id.into()); } - pub fn new_task_running>(&mut self, builder: TaskBuilder, worker_id: W) { - let task = builder.build(self.core.get_resource_map_mut()); - let task_id = task.id(); - schedule::submit_test_tasks(&mut self.core, vec![task]); + pub fn new_task_running, T: Into>( + &mut self, + task_id: T, + builder: &TaskBuilder, + worker_id: W, + ) { + let task_id = self.new_task(task_id, builder); schedule::start_on_worker_running(&mut self.core, task_id, worker_id.into()); } + pub fn new_tasks_cpus(&mut self, tasks: &[ResourceUnits]) -> Vec { + tasks + .iter() + .map(|n_cpus| { + let task_id = self.task_id_counter; + self.task_id_counter += 1; + self.new_task_cpus(task_id, *n_cpus) + }) + .collect() + } + + pub fn new_assigned_tasks_cpus(&mut self, tasks: &[&[ResourceUnits]]) { + for (i, tdefs) in tasks.iter().enumerate() { + let w_id = WorkerId::new(100 + i as u32); + let task_ids = self.new_tasks_cpus(tdefs); + for task_id in task_ids { + self._test_assign(task_id, w_id); + } + } + } + pub fn worker>(&self, worker_id: W) -> &Worker { self.core.get_worker_by_id_or_panic(worker_id.into()) } - pub fn new_workers_ext( - &mut self, - defs: &[(u32, Option, Vec)], - ) { - for (i, (c, time_limit, rs)) in defs.iter().enumerate() { - let worker_id = WorkerId::new(self.worker_id_counter); - self.worker_id_counter += 1; - - let mut rs = rs.clone(); - rs.insert( - 0, - ResourceDescriptorItem { - name: "cpus".to_string(), - kind: ResourceDescriptorKind::simple_indices(*c), - }, - ); - let rd = ResourceDescriptor::new(rs, Default::default()); - - let wcfg = WorkerConfiguration { - resources: rd, - listen_address: format!("1.1.1.{i}:123"), - hostname: format!("test{i}"), - group: "default".to_string(), - work_dir: Default::default(), - heartbeat_interval: Duration::from_millis(1000), - overview_configuration: OverviewConfiguration { - send_interval: Some(Duration::from_millis(1000)), - gpu_families: Default::default(), - }, - idle_timeout: None, - time_limit: *time_limit, - on_server_lost: ServerLostPolicy::Stop, - max_parallel_downloads: DEFAULT_MAX_PARALLEL_DOWNLOADS, - max_download_tries: DEFAULT_MAX_DOWNLOAD_TRIES, - wait_between_download_tries: DEFAULT_WAIT_BETWEEN_DOWNLOAD_TRIES, - extra: Default::default(), - }; - - let worker = Worker::new( - worker_id, - wcfg, - &self.core.create_resource_map(), - Instant::now(), - ); - on_new_worker(&mut self.core, &mut TestComm::default(), worker); - } + pub fn new_worker_with_id>(&mut self, worker_id: W, builder: &WorkerBuilder) { + let worker_id = worker_id.into(); + let resource_id_map = self.core.create_resource_map(); + let worker = builder.build(worker_id, &&resource_id_map, Instant::now()); + on_new_worker(&mut self.core, &mut TestComm::default(), worker); } - pub fn new_workers(&mut self, cpus: &[u32]) { - let defs: Vec<_> = cpus.iter().map(|c| (*c, None, Vec::new())).collect(); - self.new_workers_ext(&defs); + pub fn new_worker(&mut self, builder: &WorkerBuilder) { + let worker_id = WorkerId::new(self.worker_id_counter); + self.worker_id_counter += 1; + self.new_worker_with_id(worker_id, builder); } - pub fn new_ready_tasks_cpus(&mut self, tasks: &[ResourceUnits]) -> Vec { - let rmap = self.core.get_resource_map_mut(); - let tasks: Vec<_> = tasks - .iter() - .map(|n_cpus| { - let task_id = self.task_id_counter; - self.task_id_counter += 1; - TaskBuilder::new(task_id) - .resources(cpus_compact(*n_cpus)) - .build(rmap) - }) - .collect(); - let task_ids: Vec<_> = tasks.iter().map(|t| t.id).collect(); - schedule::submit_test_tasks(&mut self.core, tasks); - task_ids + pub fn new_workers(&mut self, n: usize, builder: &WorkerBuilder) { + (0..n).for_each(|_| self.new_worker(builder)); + } + + pub fn new_workers_cpus(&mut self, cpus: &[u32]) { + for c in cpus { + self.new_worker(&WorkerBuilder::new(*c)) + } } pub fn _test_assign(&mut self, task_id: TaskId, worker_id: WorkerId) { @@ -171,16 +170,6 @@ impl TestEnv { self._test_assign(task_id.into(), worker_id.into()); } - pub fn new_assigned_tasks_cpus(&mut self, tasks: &[&[ResourceUnits]]) { - for (i, tdefs) in tasks.iter().enumerate() { - let w_id = WorkerId::new(100 + i as u32); - let task_ids = self.new_ready_tasks_cpus(tdefs); - for task_id in task_ids { - self._test_assign(task_id, w_id); - } - } - } - pub fn get_worker_tasks>(&self, worker_id: W) -> Vec { utils::sorted_vec( self.core diff --git a/crates/tako/src/internal/tests/utils/mod.rs b/crates/tako/src/internal/tests/utils/mod.rs index d8a29a79f..dffe6b7f7 100644 --- a/crates/tako/src/internal/tests/utils/mod.rs +++ b/crates/tako/src/internal/tests/utils/mod.rs @@ -9,6 +9,8 @@ pub mod task; #[cfg(test)] pub mod worker; #[cfg(test)] +pub mod worker_comm; +#[cfg(test)] pub mod workflows; pub mod shared; diff --git a/crates/tako/src/internal/tests/utils/schedule.rs b/crates/tako/src/internal/tests/utils/schedule.rs index 69e2e84be..5b1779003 100644 --- a/crates/tako/src/internal/tests/utils/schedule.rs +++ b/crates/tako/src/internal/tests/utils/schedule.rs @@ -1,74 +1,12 @@ -use crate::internal::common::resources::ResourceDescriptor; use crate::internal::messages::worker::{TaskFinishedMsg, TaskOutput}; use crate::internal::scheduler::state::SchedulerState; use crate::internal::server::core::Core; -use crate::internal::server::reactor::{ - on_new_tasks, on_new_worker, on_task_finished, on_task_running, -}; +use crate::internal::server::reactor::{on_new_tasks, on_task_finished, on_task_running}; use crate::internal::server::task::Task; -use crate::internal::server::worker::Worker; use crate::internal::tests::utils::env::TestComm; use crate::internal::tests::utils::task::task_running_msg; -use crate::internal::worker::configuration::{ - DEFAULT_MAX_DOWNLOAD_TRIES, DEFAULT_MAX_PARALLEL_DOWNLOADS, - DEFAULT_WAIT_BETWEEN_DOWNLOAD_TRIES, OverviewConfiguration, -}; -use crate::resources::ResourceIdMap; -use crate::worker::{ServerLostPolicy, WorkerConfiguration}; use crate::{TaskId, WorkerId}; -use std::time::{Duration, Instant}; - -pub fn create_test_worker_config( - worker_id: WorkerId, - resources: ResourceDescriptor, -) -> WorkerConfiguration { - WorkerConfiguration { - resources, - listen_address: format!("1.1.1.{worker_id}:123"), - hostname: format!("test{worker_id}"), - group: "default".to_string(), - work_dir: Default::default(), - heartbeat_interval: Duration::from_millis(1000), - overview_configuration: OverviewConfiguration { - send_interval: Some(Duration::from_millis(1000)), - gpu_families: Default::default(), - }, - idle_timeout: None, - time_limit: None, - on_server_lost: ServerLostPolicy::Stop, - max_parallel_downloads: DEFAULT_MAX_PARALLEL_DOWNLOADS, - max_download_tries: DEFAULT_MAX_DOWNLOAD_TRIES, - wait_between_download_tries: DEFAULT_WAIT_BETWEEN_DOWNLOAD_TRIES, - extra: Default::default(), - } -} - -pub fn new_test_worker( - core: &mut Core, - worker_id: WorkerId, - configuration: WorkerConfiguration, - resource_map: &ResourceIdMap, -) { - let worker = Worker::new(worker_id, configuration, resource_map, Instant::now()); - on_new_worker(core, &mut TestComm::default(), worker); -} - -pub fn create_test_worker(core: &mut Core, worker_id: WorkerId, cpus: u32) { - let wcfg = create_test_worker_config(worker_id, ResourceDescriptor::simple_cpus(cpus)); - new_test_worker( - core, - worker_id, - wcfg, - &ResourceIdMap::from_vec(vec!["cpus".to_string()]), - ); -} - -pub fn create_test_workers(core: &mut Core, cpus: &[u32]) { - for (i, c) in cpus.iter().enumerate() { - let worker_id = WorkerId::new((100 + i) as u32); - create_test_worker(core, worker_id, *c); - } -} +use std::time::Instant; pub fn submit_test_tasks(core: &mut Core, tasks: Vec) { on_new_tasks(core, &mut TestComm::default(), tasks); diff --git a/crates/tako/src/internal/tests/utils/task.rs b/crates/tako/src/internal/tests/utils/task.rs index be32dcbbf..699cc413f 100644 --- a/crates/tako/src/internal/tests/utils/task.rs +++ b/crates/tako/src/internal/tests/utils/task.rs @@ -13,8 +13,8 @@ use smallvec::SmallVec; use std::rc::Rc; use thin_vec::ThinVec; +#[derive(Clone)] pub struct TaskBuilder { - id: TaskId, task_deps: Set, data_deps: ThinVec, finished_resources: Vec, @@ -25,9 +25,8 @@ pub struct TaskBuilder { } impl TaskBuilder { - pub fn new>(id: T) -> TaskBuilder { + pub fn new() -> TaskBuilder { TaskBuilder { - id: id.into(), task_deps: Default::default(), data_deps: Default::default(), finished_resources: vec![], @@ -43,15 +42,15 @@ impl TaskBuilder { self } - pub fn task_deps(mut self, deps: &[&Task]) -> TaskBuilder { - self.task_deps = deps.iter().map(|&tr| tr.id).collect(); + pub fn task_deps(mut self, deps: &[TaskId]) -> TaskBuilder { + self.task_deps = deps.iter().copied().collect(); self } - pub fn data_dep(mut self, task: &Task, data_id: u32) -> TaskBuilder { - self.task_deps.insert(task.id); + pub fn data_dep(mut self, task_id: TaskId, data_id: u32) -> TaskBuilder { + self.task_deps.insert(task_id); self.data_deps - .push(DataObjectId::new(task.id, data_id.into())); + .push(DataObjectId::new(task_id, data_id.into())); self } @@ -72,7 +71,7 @@ impl TaskBuilder { self } - pub fn cpus_compact>(mut self, count: A) -> TaskBuilder { + pub fn cpus>(mut self, count: A) -> TaskBuilder { self.resources_builder = self.resources_builder.cpus(count); self } @@ -91,9 +90,14 @@ impl TaskBuilder { self } - pub fn build(self, resource_map: &mut GlobalResourceMapping) -> Task { - let last_resource = self.resources_builder.finish(); - let mut resources: SmallVec<[ResourceRequest; 1]> = self.finished_resources.into(); + pub fn build>( + &self, + task_id: T, + resource_map: &mut GlobalResourceMapping, + ) -> Task { + let last_resource = self.resources_builder.clone().finish(); + let mut resources: SmallVec<[ResourceRequest; 1]> = + self.finished_resources.iter().cloned().collect(); resources.push(last_resource); for rq in &resources { rq.validate().unwrap(); @@ -101,10 +105,10 @@ impl TaskBuilder { let resources = ResourceRequestVariants::new(resources); let (rq_id, _) = resource_map.get_or_create_rq_id(resources); Task::new( - self.id, + task_id.into(), rq_id, - self.task_deps.into_iter().collect(), - self.data_deps, + self.task_deps.iter().copied().collect(), + self.data_deps.clone(), None, Rc::new(TaskConfiguration { time_limit: None, @@ -117,20 +121,6 @@ impl TaskBuilder { } } -pub fn task>(id: T, resource_map: &mut GlobalResourceMapping) -> Task { - TaskBuilder::new(id.into()).build(resource_map) -} - -pub fn task_with_deps>( - id: T, - deps: &[&Task], - resource_map: &mut GlobalResourceMapping, -) -> Task { - TaskBuilder::new(id.into()) - .task_deps(deps) - .build(resource_map) -} - pub fn task_running_msg>(task_id: T) -> TaskRunningMsg { TaskRunningMsg { id: task_id.into(), diff --git a/crates/tako/src/internal/tests/utils/worker.rs b/crates/tako/src/internal/tests/utils/worker.rs index d18395c1f..315f8eade 100644 --- a/crates/tako/src/internal/tests/utils/worker.rs +++ b/crates/tako/src/internal/tests/utils/worker.rs @@ -1,49 +1,81 @@ -use crate::internal::messages::worker::FromWorkerMessage; +use crate::WorkerId; +use crate::internal::server::worker::Worker; +use crate::internal::worker::configuration::{ + DEFAULT_MAX_DOWNLOAD_TRIES, DEFAULT_MAX_PARALLEL_DOWNLOADS, + DEFAULT_WAIT_BETWEEN_DOWNLOAD_TRIES, OverviewConfiguration, +}; +use crate::resources::{ResourceDescriptor, ResourceDescriptorItem, ResourceIdMap}; +use crate::worker::{ServerLostPolicy, WorkerConfiguration}; +use std::time::{Duration, Instant}; -pub struct TestWorkerComm { - messages: Vec, - worker_is_empty_notifications: usize, - start_task_notifications: usize, +pub struct WorkerBuilder { + descriptor: ResourceDescriptor, + time_limit: Option, + group: Option, } -impl Default for TestWorkerComm { - fn default() -> Self { - Self::new() +impl WorkerBuilder { + pub fn empty() -> Self { + WorkerBuilder { + descriptor: ResourceDescriptor::new(Default::default(), Default::default()), + time_limit: None, + group: None, + } } -} -impl TestWorkerComm { - pub fn new() -> Self { - TestWorkerComm { - messages: Vec::new(), - worker_is_empty_notifications: 0, - start_task_notifications: 0, + pub fn new(cpus: u32) -> Self { + WorkerBuilder { + descriptor: ResourceDescriptor::simple_cpus(cpus), + time_limit: None, + group: None, } } - pub fn check_emptiness(&self) { - assert!(self.messages.is_empty()); - assert_eq!(self.worker_is_empty_notifications, 0); - assert_eq!(self.start_task_notifications, 0); + pub fn time_limit(mut self, duration: Duration) -> Self { + self.time_limit = Some(duration); + self } - pub fn take_start_task_notifications(&mut self) -> usize { - std::mem::take(&mut self.start_task_notifications) + pub fn group(mut self, group: &str) -> Self { + self.group = Some(group.to_string()); + self } - pub fn check_start_task_notifications(&mut self, count: usize) { - assert_eq!(self.take_start_task_notifications(), count); + pub fn res_sum(mut self, name: &str, amount: u32) -> Self { + self.descriptor + .resources + .push(ResourceDescriptorItem::sum(name, amount)); + self } - pub fn send_message_to_server(&mut self, message: FromWorkerMessage) { - self.messages.push(message); + pub fn res_range(mut self, name: &str, start: u32, end: u32) -> Self { + self.descriptor + .resources + .push(ResourceDescriptorItem::range(name, start, end)); + self } - pub fn notify_worker_is_empty(&mut self) { - self.worker_is_empty_notifications += 1; - } + pub fn build(&self, worker_id: WorkerId, resource_map: &ResourceIdMap, now: Instant) -> Worker { + let config = WorkerConfiguration { + resources: self.descriptor.clone(), + listen_address: format!("1.1.1.{worker_id}:123"), + hostname: format!("test{worker_id}"), + group: self.group.as_deref().unwrap_or("default").to_string(), + work_dir: Default::default(), + heartbeat_interval: Duration::from_millis(1000), + overview_configuration: OverviewConfiguration { + send_interval: Some(Duration::from_millis(1000)), + gpu_families: Default::default(), + }, + idle_timeout: None, + time_limit: self.time_limit, + on_server_lost: ServerLostPolicy::Stop, + max_parallel_downloads: DEFAULT_MAX_PARALLEL_DOWNLOADS, + max_download_tries: DEFAULT_MAX_DOWNLOAD_TRIES, + wait_between_download_tries: DEFAULT_WAIT_BETWEEN_DOWNLOAD_TRIES, + extra: Default::default(), + }; - pub fn notify_start_task(&mut self) { - self.start_task_notifications += 1; + Worker::new(worker_id, config, resource_map, now) } } diff --git a/crates/tako/src/internal/tests/utils/worker_comm.rs b/crates/tako/src/internal/tests/utils/worker_comm.rs new file mode 100644 index 000000000..d18395c1f --- /dev/null +++ b/crates/tako/src/internal/tests/utils/worker_comm.rs @@ -0,0 +1,49 @@ +use crate::internal::messages::worker::FromWorkerMessage; + +pub struct TestWorkerComm { + messages: Vec, + worker_is_empty_notifications: usize, + start_task_notifications: usize, +} + +impl Default for TestWorkerComm { + fn default() -> Self { + Self::new() + } +} + +impl TestWorkerComm { + pub fn new() -> Self { + TestWorkerComm { + messages: Vec::new(), + worker_is_empty_notifications: 0, + start_task_notifications: 0, + } + } + + pub fn check_emptiness(&self) { + assert!(self.messages.is_empty()); + assert_eq!(self.worker_is_empty_notifications, 0); + assert_eq!(self.start_task_notifications, 0); + } + + pub fn take_start_task_notifications(&mut self) -> usize { + std::mem::take(&mut self.start_task_notifications) + } + + pub fn check_start_task_notifications(&mut self, count: usize) { + assert_eq!(self.take_start_task_notifications(), count); + } + + pub fn send_message_to_server(&mut self, message: FromWorkerMessage) { + self.messages.push(message); + } + + pub fn notify_worker_is_empty(&mut self) { + self.worker_is_empty_notifications += 1; + } + + pub fn notify_start_task(&mut self) { + self.start_task_notifications += 1; + } +} diff --git a/crates/tako/src/internal/tests/utils/workflows.rs b/crates/tako/src/internal/tests/utils/workflows.rs index fb84e5896..e54d53d44 100644 --- a/crates/tako/src/internal/tests/utils/workflows.rs +++ b/crates/tako/src/internal/tests/utils/workflows.rs @@ -1,10 +1,7 @@ -use crate::internal::server::core::Core; -use crate::internal::tests::utils::schedule::submit_test_tasks; -use crate::internal::tests::utils::task; use crate::internal::tests::utils::task::TaskBuilder; -use task::task_with_deps; +use crate::tests::utils::env::TestEnv; -pub fn submit_example_1(core: &mut Core) { +pub fn submit_example_1(rt: &mut TestEnv) { /* 11 12 \ / \ @@ -14,42 +11,16 @@ pub fn submit_example_1(core: &mut Core) { | 17 */ - let rmap = core.get_resource_map_mut(); - let t1 = task::task(11, rmap); - let t2 = task::task(12, rmap); - let t3 = task_with_deps(13, &[&t1, &t2], rmap); - let t4 = task_with_deps(14, &[&t2], rmap); - let t5 = task_with_deps(15, &[&t3, &t4], rmap); - let t6 = task_with_deps(16, &[&t3], rmap); - let t7 = task_with_deps(17, &[&t6], rmap); - submit_test_tasks(core, vec![t1, t2, t3, t4, t5, t6, t7]); + let t1 = rt.new_task_default(11); + let t2 = rt.new_task_default(12); + let t3 = rt.new_task(13, &TaskBuilder::new().task_deps(&[t1, t2])); + let t4 = rt.new_task(14, &TaskBuilder::new().task_deps(&[t2])); + let _t5 = rt.new_task(15, &TaskBuilder::new().task_deps(&[t3, t4])); + let t6 = rt.new_task(16, &TaskBuilder::new().task_deps(&[t3])); + let _t7 = rt.new_task(17, &TaskBuilder::new().task_deps(&[t6])); } -pub fn submit_example_2(core: &mut Core) { - /* - T1 - / \ - T2 T3 - | / |\ - T4 | T6 - \ \ - \ / T7 - T5 - */ - - let rmap = core.get_resource_map_mut(); - let t1 = task_with_deps(1, &[], rmap); - let t2 = task_with_deps(2, &[&t1], rmap); - let t3 = task_with_deps(3, &[&t1], rmap); - let t4 = task_with_deps(4, &[&t2, &t3], rmap); - let t5 = task_with_deps(5, &[&t4], rmap); - let t6 = task_with_deps(6, &[&t3], rmap); - let t7 = task_with_deps(7, &[&t6], rmap); - - submit_test_tasks(core, vec![t1, t2, t3, t4, t5, t6, t7]); -} - -pub fn submit_example_3(core: &mut Core) { +pub fn submit_example_3(rt: &mut TestEnv) { /* Task deps T1 T2 / |\ / \ @@ -58,18 +29,16 @@ pub fn submit_example_3(core: &mut Core) { \ / T6 */ - let rmap = core.get_resource_map_mut(); - let t1 = TaskBuilder::new(1).task_deps(&[]).build(rmap); - let t2 = TaskBuilder::new(2).task_deps(&[]).build(rmap); - let t3 = TaskBuilder::new(3).task_deps(&[&t1]).build(rmap); - let t4 = TaskBuilder::new(4).task_deps(&[&t1, &t2]).build(rmap); - let t5 = TaskBuilder::new(5).task_deps(&[&t2]).build(rmap); - let t6 = TaskBuilder::new(6).task_deps(&[&t1, &t5, &t3]).build(rmap); - submit_test_tasks(core, vec![t1, t2, t3, t4, t5, t6]); + let t1 = rt.new_task_default(1); + let t2 = rt.new_task_default(2); + let t3 = rt.new_task(3, &TaskBuilder::new().task_deps(&[t1])); + let _t4 = rt.new_task(4, &TaskBuilder::new().task_deps(&[t1, t2])); + let t5 = rt.new_task(5, &TaskBuilder::new().task_deps(&[t2])); + let _t6 = rt.new_task(6, &TaskBuilder::new().task_deps(&[t1, t5, t3])); } -pub fn submit_example_4(core: &mut Core) { +pub fn submit_example_4(rt: &mut TestEnv) { /* Task DATA deps T1 T2 | |\ @@ -78,14 +47,13 @@ pub fn submit_example_4(core: &mut Core) { T3 */ - let rmap = core.get_resource_map_mut(); - let t1 = TaskBuilder::new(1).build(rmap); - let t2 = TaskBuilder::new(2).build(rmap); - let t3 = TaskBuilder::new(3) - .data_dep(&t1, 0) - .data_dep(&t2, 0) - .data_dep(&t2, 1) - .build(rmap); - - submit_test_tasks(core, vec![t1, t2, t3]); + let t1 = rt.new_task_default(1); + let t2 = rt.new_task_default(2); + let _t3 = rt.new_task( + 3, + &TaskBuilder::new() + .data_dep(t1, 0) + .data_dep(t2, 0) + .data_dep(t2, 1), + ); } diff --git a/crates/tako/src/internal/worker/comm.rs b/crates/tako/src/internal/worker/comm.rs index baff83400..88b0551bb 100644 --- a/crates/tako/src/internal/worker/comm.rs +++ b/crates/tako/src/internal/worker/comm.rs @@ -14,7 +14,7 @@ pub struct RealWorkerComm { pub enum WorkerComm { Real(RealWorkerComm), #[cfg(test)] - Test(crate::internal::tests::utils::worker::TestWorkerComm), + Test(crate::internal::tests::utils::worker_comm::TestWorkerComm), } impl WorkerComm { @@ -28,11 +28,11 @@ impl WorkerComm { #[cfg(test)] pub fn new_test_comm() -> Self { - WorkerComm::Test(crate::internal::tests::utils::worker::TestWorkerComm::new()) + WorkerComm::Test(crate::internal::tests::utils::worker_comm::TestWorkerComm::new()) } #[cfg(test)] - pub fn test(&mut self) -> &mut crate::internal::tests::utils::worker::TestWorkerComm { + pub fn test(&mut self) -> &mut crate::internal::tests::utils::worker_comm::TestWorkerComm { match self { Self::Real(_) => panic!("Cannot get testing testing comm"), Self::Test(comm) => comm,