diff --git a/rust-stream/Cargo.lock b/rust-stream/Cargo.lock index 7c5ec13..5f225f0 100644 --- a/rust-stream/Cargo.lock +++ b/rust-stream/Cargo.lock @@ -30,9 +30,9 @@ checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a" [[package]] name = "bitflags" -version = "1.2.1" +version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693" +checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "cfg-if" @@ -102,9 +102,9 @@ dependencies = [ [[package]] name = "crossbeam-deque" -version = "0.8.0" +version = "0.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94af6efb46fef72616855b036a624cf27ba656ffc9be1b9a3c931cfc7749a9a9" +checksum = "6455c0ca19f0d2fbf751b908d5c55c1f5cbc65e03c4225427254b46890bdde1e" dependencies = [ "cfg-if", "crossbeam-epoch", @@ -186,18 +186,18 @@ dependencies = [ [[package]] name = "hermit-abi" -version = "0.1.18" +version = "0.1.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "322f4de77956e22ed0e5032c359a0f1273f1f7f0d79bfa3b8ffbc730d7fbcc5c" +checksum = "62b467343b94ba476dcb2500d242dadbb39557df889310ac77c5d99100aaac33" dependencies = [ "libc", ] [[package]] name = "instant" -version = "0.1.9" +version = "0.1.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61124eeebbd69b8190558df225adf7e4caafce0d743919e5d6b19652314ec5ec" +checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c" dependencies = [ "cfg-if", ] @@ -220,15 +220,15 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "libc" -version = "0.2.97" +version = "0.2.108" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "12b8adadd720df158f4d70dfe7ccc6adb0472d7c55ca83445f6a5ab3e36f8fb6" +checksum = "8521a1b57e76b1ec69af7599e75e38e7b7fad6610f037db8c79b127201b5d119" [[package]] name = "lock_api" -version = "0.4.4" +version = "0.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0382880606dff6d15c9476c416d18690b72742aa7b605bb6dd6ec9030fbf07eb" +checksum = "712a4d093c9976e24e7dbca41db895dabcbac38eb5f4045393d17a95bdfb1109" dependencies = [ "scopeguard", ] @@ -253,9 +253,9 @@ dependencies = [ [[package]] name = "mio" -version = "0.7.11" +version = "0.7.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf80d3e903b34e0bd7282b218398aec54e082c840d9baf8339e0080a0c542956" +checksum = "8067b404fe97c70829f082dec8bcf4f71225d7eaea1d8645349cb76fa06205cc" dependencies = [ "libc", "log", @@ -303,9 +303,9 @@ dependencies = [ [[package]] name = "parking_lot" -version = "0.11.1" +version = "0.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6d7744ac029df22dca6284efe4e898991d28e3085c706c972bcd7da4a27a15eb" +checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99" dependencies = [ "instant", "lock_api", @@ -314,9 +314,9 @@ dependencies = [ [[package]] name = "parking_lot_core" -version = "0.8.3" +version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa7a782938e745763fe6907fc6ba86946d72f49fe7e21de074e08128a99fb018" +checksum = "d76e8e1493bcac0d2766c42737f34458f1c8c50c0d23bcb24ea953affb273216" dependencies = [ "cfg-if", "instant", @@ -361,18 +361,18 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.27" +version = "1.0.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f0d8caf72986c1a598726adc988bb5984792ef84f5ee5aa50209145ee8077038" +checksum = "ba508cc11742c0dc5c1659771673afbab7a0efab23aa17e854cbab0837ed0b43" dependencies = [ "unicode-xid", ] [[package]] name = "quote" -version = "1.0.9" +version = "1.0.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3d0b9745dc2debf507c8422de05d7226cc1f0644216dfdfead988f9b1ab32a7" +checksum = "38bc8cc6a5f2e3655e0899c1b848643b2562f853f114bfec7be120678e3ace05" dependencies = [ "proc-macro2", ] @@ -404,9 +404,9 @@ dependencies = [ [[package]] name = "redox_syscall" -version = "0.2.8" +version = "0.2.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "742739e41cd49414de871ea5e549afb7e2a3ac77b589bcbebe8c82fab37147fc" +checksum = "8383f39639269cde97d255a32bdb68c047337295414940c68bdd30c2e13203ff" dependencies = [ "bitflags", ] @@ -502,9 +502,9 @@ dependencies = [ [[package]] name = "smallvec" -version = "1.6.1" +version = "1.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fe0f37c9e8f3c5a4a66ad655a93c74daac4ad00c441533bf5c6e7990bb42604e" +checksum = "1ecab6c735a6bb4139c0caafd0cc3635748bbb3acf4550e8138122099251f309" [[package]] name = "strsim" @@ -514,9 +514,9 @@ checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a" [[package]] name = "structopt" -version = "0.3.21" +version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5277acd7ee46e63e5168a80734c9f6ee81b1367a7d8772a2d765df2a3705d28c" +checksum = "40b9788f4202aa75c240ecc9c15c65185e6a39ccdeb0fd5d008b98825464c87c" dependencies = [ "clap", "lazy_static", @@ -525,9 +525,9 @@ dependencies = [ [[package]] name = "structopt-derive" -version = "0.4.14" +version = "0.4.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ba9cdfda491b814720b6b06e0cac513d922fc407582032e8706e9f137976f90" +checksum = "dcb5ae327f9cc13b68763b5749770cb9e048a99bd9dfdfa58d0cf05d5f64afe0" dependencies = [ "heck", "proc-macro-error", @@ -538,9 +538,9 @@ dependencies = [ [[package]] name = "syn" -version = "1.0.73" +version = "1.0.82" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f71489ff30030d2ae598524f61326b902466f72a0fb1a8564c001cc63425bcc7" +checksum = "8daf5dd0bb60cbd4137b1b587d2fc0ae729bc07cf01cd70b36a1ed5ade3b9d59" dependencies = [ "proc-macro2", "quote", @@ -573,15 +573,15 @@ checksum = "56dee185309b50d1f11bfedef0fe6d036842e3fb77413abef29f8f8d1c5d4c1c" [[package]] name = "unicode-segmentation" -version = "1.7.1" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bb0d2e7be6ae3a5fa87eed5fb451aff96f2573d2694942e40543ae0bbe19c796" +checksum = "8895849a949e7845e06bd6dc1aa51731a103c42707010a5b591c0038fb73385b" [[package]] name = "unicode-width" -version = "0.1.8" +version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9337591893a19b88d8d87f2cec1e73fad5cdfd10e5a6f349f498ad6ea2ffb1e3" +checksum = "3ed742d4ea2bd1176e236172c8429aaf54486e7ac098db29ffe6529e0ce50973" [[package]] name = "unicode-xid" diff --git a/rust-stream/README.md b/rust-stream/README.md index ee9c056..6696de5 100644 --- a/rust-stream/README.md +++ b/rust-stream/README.md @@ -10,15 +10,21 @@ Currently, we support three CPU threading API as devices: see [rayon_stream.rs](src/rayon_stream.rs) * [Crossbeam](https://github.com/crossbeam-rs/crossbeam) - Parallel with partitions per thread, see [crossbeam_stream.rs](src/crossbeam_stream.rs) +* Arc - Parallel with `Vec` per thread (static partitions) wrapped in `Mutex` contained in `Arc`s, + see [crossbeam_stream.rs](src/arc_stream.rs) +* Unsafe - Parallel with unsafe pointer per thread (static partitions) to `Vec`, + see [crossbeam_stream.rs](src/unsafe_stream.rs) In addition, this implementation also supports the following extra flags: - +**** ``` --init Initialise each benchmark array at allocation time on the main thread --malloc Use libc malloc instead of the Rust's allocator for benchmark array allocation --pin Pin threads to distinct cores, this has NO effect in Rayon devices ``` +Max thread count is controlled by the environment variable `BABELSTREAM_NUM_THREADS` which is compatible for all devices (avoid setting `RAYON_NUM_THREADS`, the implementation will issue a warning if this happened). + There is an ongoing investigation on potential performance issues under NUMA situations. As part of the experiment, this implementation made use of the provisional [Allocator traits](https://github.com/rust-lang/rust/issues/32838) which requires rust diff --git a/rust-stream/rustfmt.toml b/rust-stream/rustfmt.toml index 23c912d..aa2f0e9 100644 --- a/rust-stream/rustfmt.toml +++ b/rust-stream/rustfmt.toml @@ -54,7 +54,7 @@ use_field_init_shorthand = false force_explicit_abi = true condense_wildcard_suffixes = false color = "Auto" -required_version = "1.4.36" +required_version = "1.4.38" unstable_features = false disable_all_formatting = false skip_children = false diff --git a/rust-stream/src/arc_stream.rs b/rust-stream/src/arc_stream.rs new file mode 100644 index 0000000..006f73a --- /dev/null +++ b/rust-stream/src/arc_stream.rs @@ -0,0 +1,254 @@ +use std::iter::Sum; +use std::sync::{Arc, Mutex}; + +use self::core_affinity::CoreId; +use crate::stream::{AllocatorType, ArrayType, RustStream, StreamData}; + +struct ArcHeapData { + a_chunks: Vec>>>, + b_chunks: Vec>>>, + c_chunks: Vec>>>, +} + +pub struct ArcDevice { + pub(crate) ncore: usize, + pub(crate) pin: bool, + pub(crate) core_ids: Vec, + data: ArcHeapData, +} + +impl ArcDevice { + pub fn new(ncore: usize, pin: bool, alloc: A) -> Self { + let mut core_ids = match core_affinity::get_core_ids() { + Some(xs) => xs, + None => { + colour::e_red_ln!("Cannot enumerate cores, pinning will not work if enabled"); + (0..ncore).map(|i| CoreId { id: i }).collect() + } + }; + core_ids.resize(ncore, core_ids[0]); + + let lift = + || (0..ncore).map(|_| return Arc::new(Mutex::new(Vec::new_in(alloc)))).collect::>(); + let data = ArcHeapData { a_chunks: lift(), b_chunks: lift(), c_chunks: lift() }; + + ArcDevice { ncore, pin, core_ids, data } + } + + pub fn ref_a(&self, t: usize) -> Arc>> { self.data.a_chunks[t].clone() } + + pub fn ref_b(&self, t: usize) -> Arc>> { self.data.b_chunks[t].clone() } + + pub fn ref_c(&self, t: usize) -> Arc>> { self.data.c_chunks[t].clone() } + + // divide the length by the number of cores, the last core gets less work if it does not divide + fn chunk_size(&self, len: usize, t: usize) -> usize { + assert!(t < self.ncore); + let chunk = (len as f64 / self.ncore as f64).ceil() as usize; + if t == self.ncore - 1 { + len - (t * chunk) + } else { + chunk + } + } +} + +extern crate core_affinity; + +// Arc+Mutex threaded version, it should be semantically equal to the single threaded version +impl + RustStream for StreamData, A> +{ + fn init_arrays(&mut self) { + let init = self.init; + let pin = self.device.pin; + (0..self.device.ncore) + .map(&|t| { + let ref_a = self.device.ref_a(t); + let ref_b = self.device.ref_b(t); + let ref_c = self.device.ref_c(t); + let core = self.device.core_ids[t]; + let n = self.device.chunk_size(self.size, t); + std::thread::spawn(move || { + if pin { + core_affinity::set_for_current(core); + } + ref_a.lock().unwrap().resize(n, init.0); + ref_b.lock().unwrap().resize(n, init.1); + ref_c.lock().unwrap().resize(n, init.2); + }) + }) + .collect::>() + .into_iter() + .for_each(|t| t.join().unwrap()); + } + fn read_arrays(&mut self) { + let range = self.size; + let unlift = |drain: &mut Vec, source: &Vec>>>| { + let xs = + source.into_iter().flat_map(|x| x.lock().unwrap().clone().into_iter()).collect::>(); + for i in 0..range { + drain[i] = xs[i]; + } + }; + unlift(&mut self.a, &self.device.data.a_chunks); + unlift(&mut self.b, &self.device.data.b_chunks); + unlift(&mut self.c, &self.device.data.c_chunks); + } + + fn copy(&mut self) { + let pin = self.device.pin; + (0..self.device.ncore) + .map(move |t| { + let ref_a = self.device.ref_a(t); + let ref_c = self.device.ref_c(t); + let core = self.device.core_ids[t]; + let n = self.device.chunk_size(self.size, t); + std::thread::spawn(move || { + if pin { + core_affinity::set_for_current(core); + } + let a = ref_a.lock().unwrap(); + let mut c = ref_c.lock().unwrap(); + for i in 0..n { + c[i] = a[i]; + } + }) + }) + .collect::>() + .into_iter() + .for_each(|t| t.join().unwrap()); + } + + fn mul(&mut self) { + let scalar = self.scalar; + let pin = self.device.pin; + (0..self.device.ncore) + .map(move |t| { + let ref_b = self.device.ref_b(t); + let ref_c = self.device.ref_c(t); + let core = self.device.core_ids[t]; + let n = self.device.chunk_size(self.size, t); + std::thread::spawn(move || { + if pin { + core_affinity::set_for_current(core); + } + let mut b = ref_b.lock().unwrap(); + let c = ref_c.lock().unwrap(); + for i in 0..n { + b[i] = scalar * c[i]; + } + }) + }) + .collect::>() + .into_iter() + .for_each(|t| t.join().unwrap()); + } + + fn add(&mut self) { + let pin = self.device.pin; + (0..self.device.ncore) + .map(&|t| { + let ref_a = self.device.ref_a(t); + let ref_b = self.device.ref_b(t); + let ref_c = self.device.ref_c(t); + let core = self.device.core_ids[t]; + let n = self.device.chunk_size(self.size, t); + std::thread::spawn(move || { + if pin { + core_affinity::set_for_current(core); + } + let a = ref_a.lock().unwrap(); + let b = ref_b.lock().unwrap(); + let mut c = ref_c.lock().unwrap(); + for i in 0..n { + c[i] = a[i] + b[i]; + } + }) + }) + .collect::>() + .into_iter() + .for_each(|t| t.join().unwrap()); + } + + fn triad(&mut self) { + let scalar = self.scalar; + let pin = self.device.pin; + (0..self.device.ncore) + .map(&|t| { + let ref_a = self.device.ref_a(t); + let ref_b = self.device.ref_b(t); + let ref_c = self.device.ref_c(t); + let core = self.device.core_ids[t]; + let n = self.device.chunk_size(self.size, t); + std::thread::spawn(move || { + if pin { + core_affinity::set_for_current(core); + } + let mut a = ref_a.lock().unwrap(); + let b = ref_b.lock().unwrap(); + let c = ref_c.lock().unwrap(); + for i in 0..n { + a[i] = b[i] + scalar * c[i] + } + }) + }) + .collect::>() + .into_iter() + .for_each(|t| t.join().unwrap()); + } + + fn nstream(&mut self) { + let scalar = self.scalar; + let pin = self.device.pin; + (0..self.device.ncore) + .map(&|t| { + let ref_a = self.device.ref_a(t); + let ref_b = self.device.ref_b(t); + let ref_c = self.device.ref_c(t); + let core = self.device.core_ids[t]; + let n = self.device.chunk_size(self.size, t); + std::thread::spawn(move || { + if pin { + core_affinity::set_for_current(core); + } + let mut a = ref_a.lock().unwrap(); + let b = ref_b.lock().unwrap(); + let c = ref_c.lock().unwrap(); + for i in 0..n { + a[i] += b[i] + scalar * c[i] + } + }) + }) + .collect::>() + .into_iter() + .for_each(|t| t.join().unwrap()); + } + + fn dot(&mut self) -> T { + let pin = self.device.pin; + (0..self.device.ncore) + .map(&|t| { + let ref_a = self.device.ref_a(t); + let ref_b = self.device.ref_b(t); + let core = self.device.core_ids[t]; + let n = self.device.chunk_size(self.size, t); + std::thread::spawn(move || { + if pin { + core_affinity::set_for_current(core); + } + let a = ref_a.lock().unwrap(); + let b = ref_b.lock().unwrap(); + let mut p = T::default(); + for i in 0..n { + p += a[i] * b[i]; + } + p + }) + }) + .collect::>() + .into_iter() + .map(|t| t.join().unwrap()) + .sum() + } +} diff --git a/rust-stream/src/crossbeam_stream.rs b/rust-stream/src/crossbeam_stream.rs index 85b6503..44358ae 100644 --- a/rust-stream/src/crossbeam_stream.rs +++ b/rust-stream/src/crossbeam_stream.rs @@ -6,13 +6,13 @@ use crossbeam::thread; use self::core_affinity::CoreId; use crate::stream::{AllocatorType, ArrayType, RustStream, StreamData}; -pub struct ThreadedDevice { +pub struct CrossbeamDevice { pub(crate) ncore: usize, pub(crate) pin: bool, pub(crate) core_ids: Vec, } -impl ThreadedDevice { +impl CrossbeamDevice { pub fn new(ncore: usize, pin: bool) -> Self { let mut core_ids = match core_affinity::get_core_ids() { Some(xs) => xs, @@ -22,15 +22,13 @@ impl ThreadedDevice { } }; core_ids.resize(ncore, core_ids[0]); - ThreadedDevice { ncore, pin, core_ids } + CrossbeamDevice { ncore, pin, core_ids } } } -impl ThreadedDevice { +impl CrossbeamDevice { // divide the length by the number of cores, the last core gets less work if it does not divide - fn chunk_size(&self, len: usize) -> usize { - (len as f64 / self.ncore as f64).ceil() as usize - } + fn chunk_size(&self, len: usize) -> usize { (len as f64 / self.ncore as f64).ceil() as usize } // make a mutable chunk from the vec fn mk_mut_chunks<'a, T, A: AllocatorType>(&self, xs: &'a mut Vec) -> ChunksMut<'a, T> { @@ -48,7 +46,7 @@ extern crate core_affinity; // Crossbeam threaded version, it should be semantically equal to the single threaded version impl RustStream - for StreamData + for StreamData { fn init_arrays(&mut self) { thread::scope(|s| { diff --git a/rust-stream/src/lib.rs b/rust-stream/src/lib.rs index 0f34c02..3ac72c3 100644 --- a/rust-stream/src/lib.rs +++ b/rust-stream/src/lib.rs @@ -2,6 +2,7 @@ #![feature(vec_into_raw_parts)] use std::alloc::System; +use std::env; use std::fmt::{Debug, Display}; use std::iter::Sum; use std::mem::size_of; @@ -11,15 +12,19 @@ use num_traits::abs; use structopt::StructOpt; use tabular::{Row, Table}; -use crate::crossbeam_stream::ThreadedDevice; +use crate::arc_stream::ArcDevice; +use crate::crossbeam_stream::CrossbeamDevice; use crate::plain_stream::SerialDevice; use crate::rayon_stream::RayonDevice; use crate::stream::{AllocatorType, ArrayType, RustStream, StreamData}; +use crate::unsafe_stream::UnsafeDevice; +mod arc_stream; mod crossbeam_stream; mod plain_stream; mod rayon_stream; mod stream; +mod unsafe_stream; #[derive(Debug, StructOpt)] struct Options { @@ -119,9 +124,7 @@ fn check_solution, D, A: AllocatorType> fn run_cpu + Display, D, A: AllocatorType>( option: &Options, mut stream: StreamData, ) -> bool -where - StreamData: RustStream, -{ +where StreamData: RustStream { let benchmark = match (option.nstream_only, option.triad_only) { (true, false) => Benchmark::NStream, (false, true) => Benchmark::Triad, @@ -175,7 +178,8 @@ where let tabulate = |xs: &Vec, name: &str, t_size: usize| -> Vec<(&str, String)> { let tail = &xs[1..]; // tail only - // do stats + + // do stats let max = tail.iter().max().map(|d| d.as_secs_f64()); let min = tail.iter().min().map(|d| d.as_secs_f64()); match (min, max) { @@ -234,6 +238,7 @@ where let solutions_correct = match benchmark { Benchmark::All => { let (results, sum) = stream.run_all(option.numtimes); + stream.read_arrays(); let correct = check_solution(benchmark, option.numtimes, &stream, Some(sum)); tabulate_all(vec![ tabulate(&results.copy, "Copy", 2 * array_bytes), @@ -246,12 +251,14 @@ where } Benchmark::NStream => { let results = stream.run_nstream(option.numtimes); + stream.read_arrays(); let correct = check_solution(benchmark, option.numtimes, &stream, None); tabulate_all(vec![tabulate(&results, "Nstream", 4 * array_bytes)]); correct } Benchmark::Triad => { let results = stream.run_triad(option.numtimes); + stream.read_arrays(); let correct = check_solution(benchmark, option.numtimes, &stream, None); let total_bytes = 3 * array_bytes * option.numtimes; let bandwidth = giga_scale * (total_bytes as f64 / results.as_secs_f64()); @@ -260,7 +267,7 @@ where correct } }; - &stream.clean_up(); + stream.clean_up(); solutions_correct } @@ -274,139 +281,138 @@ static START_SCALAR: f32 = 0.4; static FLOAT_INIT_SCALAR: f32 = START_SCALAR; static FLOAT_INIT: (f32, f32, f32) = (START_A, START_B, START_C); -static DOUBLE_START_SCALAR: f64 = START_SCALAR as f64; +static DOUBLE_INIT_SCALAR: f64 = START_SCALAR as f64; static DOUBLE_INIT: (f64, f64, f64) = (START_A as f64, START_B as f64, START_C as f64); pub fn run(args: &Vec) -> bool { + let opt: Options = Options::from_iter(args); - let options: Options = Options::from_iter(args); - - if options.numtimes < 2 { + if opt.numtimes < 2 { panic!("numtimes must be >= 2") } let alloc = System; - let alloc_name = if options.malloc { "libc-malloc" } else { "rust-system" }; + let alloc_name = if opt.malloc { "libc-malloc" } else { "rust-system" }; + + fn mk_data( + opt: &Options, init: (T, T, T), scalar: T, dev: D, alloc: A, + ) -> StreamData { + StreamData::new_in(opt.arraysize, scalar, init, dev, alloc, opt.malloc, opt.init) + } + + let num_thread_key = "BABELSTREAM_NUM_THREADS"; + let max_ncores = num_cpus::get(); + let ncores = match env::var(num_thread_key) { + Ok(v) => match v.parse::() { + Err(bad) => { + colour::e_yellow_ln!( + "Cannot parse {} (reason: {}), defaulting to {}", + bad, + num_thread_key, + max_ncores + ); + max_ncores + } + Ok(n) if n <= 0 || n > max_ncores as i64 => { + println!("{} out of bound ({}), defaulting to {}", num_thread_key, n, max_ncores); + max_ncores + } + Ok(n) => n as usize, + }, + Err(_) => { + println!("{} not set, defaulting to max ({})", num_thread_key, max_ncores); + max_ncores + } + }; let rayon_device = &|| { - let dev = RayonDevice { pool: rayon::ThreadPoolBuilder::default().build().unwrap() }; - if !options.csv { + let rayon_num_thread_key = "RAYON_NUM_THREADS"; + if env::var(rayon_num_thread_key).is_ok() { + colour::e_yellow_ln!("{} is ignored, set {} instead", rayon_num_thread_key, num_thread_key) + } + let dev = RayonDevice { + pool: rayon::ThreadPoolBuilder::default().num_threads(ncores).build().unwrap(), + }; + if !opt.csv { println!("Using {} thread(s), alloc={}", dev.pool.current_num_threads(), alloc_name); - if options.pin { + if opt.pin { colour::e_yellow_ln!("Pinning threads have no effect on Rayon!") } } - if options.float { - run_cpu( - &options, - StreamData::new_in( - options.arraysize, - FLOAT_INIT_SCALAR, - FLOAT_INIT, - dev, - alloc, - options.malloc, - options.init, - ), - ) + if opt.float { + run_cpu(&opt, mk_data(&opt, FLOAT_INIT, FLOAT_INIT_SCALAR, dev, alloc)) } else { - run_cpu( - &options, - StreamData::new_in( - options.arraysize, - DOUBLE_START_SCALAR, - DOUBLE_INIT, - dev, - alloc, - options.malloc, - options.init, - ), - ) + run_cpu(&opt, mk_data(&opt, DOUBLE_INIT, DOUBLE_INIT_SCALAR, dev, alloc)) + } + }; + + let arc_device = &|| { + if !opt.csv { + println!("Using {} thread, pin={}, alloc={}", ncores, opt.pin, alloc_name); + } + if opt.float { + let dev = ArcDevice::::new(ncores, opt.pin, alloc); + run_cpu(&opt, mk_data(&opt, FLOAT_INIT, FLOAT_INIT_SCALAR, dev, alloc)) + } else { + let dev = ArcDevice::::new(ncores, opt.pin, alloc); + run_cpu(&opt, mk_data(&opt, DOUBLE_INIT, DOUBLE_INIT_SCALAR, dev, alloc)) + } + }; + + let unsafe_device = &|| { + if !opt.csv { + println!("Using {} thread, pin={}, alloc={}", ncores, opt.pin, alloc_name); + } + if opt.float { + let dev = UnsafeDevice::::new(ncores, opt.pin); + run_cpu(&opt, mk_data(&opt, FLOAT_INIT, FLOAT_INIT_SCALAR, dev, alloc)) + } else { + let dev = UnsafeDevice::::new(ncores, opt.pin); + run_cpu(&opt, mk_data(&opt, DOUBLE_INIT, DOUBLE_INIT_SCALAR, dev, alloc)) } }; let crossbeam_device = &|| { - let ncores = num_cpus::get(); - let dev = ThreadedDevice::new(ncores, options.pin); - if !options.csv { - println!("Using {} thread(s), pin={}, alloc={}", ncores, options.pin, alloc_name) + let dev = CrossbeamDevice::new(ncores, opt.pin); + if !opt.csv { + println!("Using {} thread(s), pin={}, alloc={}", ncores, opt.pin, alloc_name) } - if options.float { - run_cpu( - &options, - StreamData::new_in( - options.arraysize, - FLOAT_INIT_SCALAR, - FLOAT_INIT, - dev, - alloc, - options.malloc, - options.init, - ), - ) + if opt.float { + run_cpu(&opt, mk_data(&opt, FLOAT_INIT, FLOAT_INIT_SCALAR, dev, alloc)) } else { - run_cpu( - &options, - StreamData::new_in( - options.arraysize, - DOUBLE_START_SCALAR, - DOUBLE_INIT, - dev, - alloc, - options.malloc, - options.init, - ), - ) + run_cpu(&opt, mk_data(&opt, DOUBLE_INIT, DOUBLE_INIT_SCALAR, dev, alloc)) } }; + let st_device = &|| { - let dev = SerialDevice { pin: options.pin }; - if !options.csv { - println!("Using 1 thread, pin={}, alloc={}", options.pin, alloc_name); + let dev = SerialDevice { pin: opt.pin }; + if !opt.csv { + println!("Using 1 thread, pin={}, alloc={}", opt.pin, alloc_name); } - if options.float { - run_cpu( - &options, - StreamData::new_in( - options.arraysize, - FLOAT_INIT_SCALAR, - FLOAT_INIT, - dev, - alloc, - options.malloc, - options.init, - ), - ) + if opt.float { + run_cpu(&opt, mk_data(&opt, FLOAT_INIT, FLOAT_INIT_SCALAR, dev, alloc)) } else { - run_cpu( - &options, - StreamData::new_in( - options.arraysize, - DOUBLE_START_SCALAR, - DOUBLE_INIT, - dev, - alloc, - options.malloc, - options.init, - ), - ) + run_cpu(&opt, mk_data(&opt, DOUBLE_INIT, DOUBLE_INIT_SCALAR, dev, alloc)) } }; + let devices: Vec<(String, &'_ dyn Fn() -> bool)> = vec![ - ("CPU (Rayon)".to_string(), rayon_device), - (format!("CPU (Crossbeam, pinning={})", options.pin), crossbeam_device), ("CPU (Single threaded)".to_string(), st_device), + ("CPU (Rayon)".to_string(), rayon_device), + (format!("CPU (Arc, pinning={})", opt.pin), arc_device), + (format!("CPU (Unsafe, pinning={})", opt.pin), unsafe_device), + (format!("CPU (Crossbeam, pinning={})", opt.pin), crossbeam_device), ]; - if options.list { + if opt.list { devices.iter().enumerate().for_each(|(i, (name, _))| { println!("[{}] {}", i, name); }); true } else { - match devices.get(options.device) { + match devices.get(opt.device) { Some((name, run)) => { - if !&options.csv { + if !&opt.csv { println!( "BabelStream\n\ Version: {}\n\ @@ -414,14 +420,14 @@ pub fn run(args: &Vec) -> bool { VERSION.unwrap_or("unknown"), name ); - if options.init { + if opt.init { println!("Initialising arrays on main thread"); } } run() } None => { - eprintln!("Device index {} not available", options.device); + eprintln!("Device index {} not available", opt.device); false } } diff --git a/rust-stream/src/stream.rs b/rust-stream/src/stream.rs index 1568d4a..560c6f1 100644 --- a/rust-stream/src/stream.rs +++ b/rust-stream/src/stream.rs @@ -124,6 +124,7 @@ impl StreamData { pub trait RustStream { fn init_arrays(&mut self); + fn read_arrays(&mut self) {} // default to no-op as most impl. doesn't need this fn copy(&mut self); fn mul(&mut self); fn add(&mut self); diff --git a/rust-stream/src/unsafe_stream.rs b/rust-stream/src/unsafe_stream.rs new file mode 100644 index 0000000..968cc4e --- /dev/null +++ b/rust-stream/src/unsafe_stream.rs @@ -0,0 +1,266 @@ +extern crate core_affinity; + +use std::alloc::Allocator; +use std::iter::Sum; +use std::ops::Range; + +use crate::stream::{AllocatorType, ArrayType, RustStream, StreamData}; + +use self::core_affinity::CoreId; + +#[derive(Debug, Copy, Clone)] +struct UnsafeData(*mut T, usize); + +impl UnsafeData { + fn empty() -> UnsafeData { UnsafeData(([] as [T; 0]).as_mut_ptr(), 0) } + fn new(xs: &mut Vec) -> UnsafeData { + UnsafeData(xs.as_mut_ptr(), xs.len()) + } + + fn get_slice(&self) -> &mut [T] { unsafe { std::slice::from_raw_parts_mut(self.0, self.1) } } +} + +unsafe impl Send for UnsafeData {} +unsafe impl Sync for UnsafeData {} + +#[derive(Debug, Copy, Clone)] +struct UnsafeRefs { + a: UnsafeData, + b: UnsafeData, + c: UnsafeData, +} + +unsafe impl Send for UnsafeRefs {} +unsafe impl Sync for UnsafeRefs {} + +pub struct UnsafeDevice { + pub(crate) ncore: usize, + pub(crate) pin: bool, + pub(crate) core_ids: Vec, + data: UnsafeRefs, +} + +impl UnsafeDevice { + pub fn new(ncore: usize, pin: bool) -> Self { + let mut core_ids = match core_affinity::get_core_ids() { + Some(xs) => xs, + None => { + colour::e_red_ln!("Cannot enumerate cores, pinning will not work if enabled"); + (0..ncore).map(|i| CoreId { id: i }).collect() + } + }; + core_ids.resize(ncore, core_ids[0]); + + UnsafeDevice { + ncore, + pin, + core_ids, + data: UnsafeRefs { a: UnsafeData::empty(), b: UnsafeData::empty(), c: UnsafeData::empty() }, + } + } + + fn thread_ranges(&self, len: usize) -> Vec<(usize, Range)> { + let chunk = (len as f64 / self.ncore as f64).ceil() as usize; + (0..self.ncore) + .map(|t| { + (t, if t == self.ncore - 1 { (t * chunk)..len } else { (t * chunk)..((t + 1) * chunk) }) + }) + .collect::>() + } +} + +// Unsafe threaded version, it should be semantically equal to the single threaded version +impl RustStream + for StreamData, A> +{ + fn init_arrays(&mut self) { + self.device.data.a = UnsafeData::new(&mut self.a); + self.device.data.b = UnsafeData::new(&mut self.b); + self.device.data.c = UnsafeData::new(&mut self.c); + let init = self.init; + let pin = self.device.pin; + let data = self.device.data; + self + .device + .thread_ranges(self.size) + .into_iter() + .map(|(t, r)| { + let core = self.device.core_ids[t]; + std::thread::spawn(move || { + if pin { + core_affinity::set_for_current(core); + } + let a = data.a.get_slice(); + let b = data.b.get_slice(); + let c = data.c.get_slice(); + for i in r { + a[i] = init.0; + b[i] = init.1; + c[i] = init.2; + } + }) + }) + .collect::>() + .into_iter() + .for_each(|t| t.join().unwrap()); + } + + fn copy(&mut self) { + let pin = self.device.pin; + let data = self.device.data; + self + .device + .thread_ranges(self.size) + .into_iter() + .map(|(t, r)| { + let core = self.device.core_ids[t]; + std::thread::spawn(move || { + if pin { + core_affinity::set_for_current(core); + } + let a = data.a.get_slice(); + let c = data.c.get_slice(); + for i in r { + c[i] = a[i]; + } + }) + }) + .collect::>() + .into_iter() + .for_each(|t| t.join().unwrap()); + } + + fn mul(&mut self) { + let scalar = self.scalar; + let pin = self.device.pin; + let data = self.device.data; + self + .device + .thread_ranges(self.size) + .into_iter() + .map(|(t, r)| { + let core = self.device.core_ids[t]; + std::thread::spawn(move || { + if pin { + core_affinity::set_for_current(core); + } + let b = data.b.get_slice(); + let c = data.c.get_slice(); + for i in r { + b[i] = scalar * c[i]; + } + }) + }) + .collect::>() + .into_iter() + .for_each(|t| t.join().unwrap()); + } + + fn add(&mut self) { + let pin = self.device.pin; + let data = self.device.data; + self + .device + .thread_ranges(self.size) + .into_iter() + .map(|(t, r)| { + let core = self.device.core_ids[t]; + std::thread::spawn(move || { + if pin { + core_affinity::set_for_current(core); + } + let a = data.a.get_slice(); + let b = data.b.get_slice(); + let c = data.c.get_slice(); + for i in r { + c[i] = a[i] + b[i]; + } + }) + }) + .collect::>() + .into_iter() + .for_each(|t| t.join().unwrap()); + } + + fn triad(&mut self) { + let scalar = self.scalar; + let pin = self.device.pin; + let data = self.device.data; + self + .device + .thread_ranges(self.size) + .into_iter() + .map(|(t, r)| { + let core = self.device.core_ids[t]; + std::thread::spawn(move || { + if pin { + core_affinity::set_for_current(core); + } + let a = data.a.get_slice(); + let b = data.b.get_slice(); + let c = data.c.get_slice(); + for i in r { + a[i] = b[i] + scalar * c[i] + } + }) + }) + .collect::>() + .into_iter() + .for_each(|t| t.join().unwrap()); + } + + fn nstream(&mut self) { + let scalar = self.scalar; + let pin = self.device.pin; + let data = self.device.data; + self + .device + .thread_ranges(self.size) + .into_iter() + .map(|(t, r)| { + let core = self.device.core_ids[t]; + std::thread::spawn(move || { + if pin { + core_affinity::set_for_current(core); + } + let a = data.a.get_slice(); + let b = data.b.get_slice(); + let c = data.c.get_slice(); + for i in r { + a[i] += b[i] + scalar * c[i] + } + }) + }) + .collect::>() + .into_iter() + .for_each(|t| t.join().unwrap()); + } + + fn dot(&mut self) -> T { + let pin = self.device.pin; + let data = self.device.data; + self + .device + .thread_ranges(self.size) + .into_iter() + .map(|(t, r)| { + let core = self.device.core_ids[t]; + std::thread::spawn(move || { + if pin { + core_affinity::set_for_current(core); + } + let a = data.a.get_slice(); + let b = data.b.get_slice(); + let mut p = T::default(); + for i in r { + p += a[i] * b[i]; + } + p + }) + }) + .collect::>() + .into_iter() + .map(|t| t.join().unwrap()) + .sum() + } +} diff --git a/rust-stream/tests/integration_test.rs b/rust-stream/tests/integration_test.rs index 101f8f8..8031a79 100644 --- a/rust-stream/tests/integration_test.rs +++ b/rust-stream/tests/integration_test.rs @@ -2,7 +2,7 @@ use rstest::rstest; #[rstest] fn test_main( - #[values(0, 1, 2)] device: usize, // + #[values(0, 1, 2, 3, 4)] device: usize, // #[values("", "--pin")] pin: &str, // #[values("", "--malloc")] malloc: &str, // #[values("", "--init")] init: &str, //