Add unsafe and Arc implementation

Add BABELSTREAM_NUM_THREADS env option
Refactor driver
Bump dependencies
This commit is contained in:
Tom Lin 2021-12-06 15:55:52 +00:00
parent e3bd58378f
commit c61b93dc65
9 changed files with 681 additions and 150 deletions

72
rust-stream/Cargo.lock generated
View File

@ -30,9 +30,9 @@ checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a"
[[package]] [[package]]
name = "bitflags" name = "bitflags"
version = "1.2.1" version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693" checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]] [[package]]
name = "cfg-if" name = "cfg-if"
@ -102,9 +102,9 @@ dependencies = [
[[package]] [[package]]
name = "crossbeam-deque" name = "crossbeam-deque"
version = "0.8.0" version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94af6efb46fef72616855b036a624cf27ba656ffc9be1b9a3c931cfc7749a9a9" checksum = "6455c0ca19f0d2fbf751b908d5c55c1f5cbc65e03c4225427254b46890bdde1e"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"crossbeam-epoch", "crossbeam-epoch",
@ -186,18 +186,18 @@ dependencies = [
[[package]] [[package]]
name = "hermit-abi" name = "hermit-abi"
version = "0.1.18" version = "0.1.19"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "322f4de77956e22ed0e5032c359a0f1273f1f7f0d79bfa3b8ffbc730d7fbcc5c" checksum = "62b467343b94ba476dcb2500d242dadbb39557df889310ac77c5d99100aaac33"
dependencies = [ dependencies = [
"libc", "libc",
] ]
[[package]] [[package]]
name = "instant" name = "instant"
version = "0.1.9" version = "0.1.12"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "61124eeebbd69b8190558df225adf7e4caafce0d743919e5d6b19652314ec5ec" checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
] ]
@ -220,15 +220,15 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]] [[package]]
name = "libc" name = "libc"
version = "0.2.97" version = "0.2.108"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "12b8adadd720df158f4d70dfe7ccc6adb0472d7c55ca83445f6a5ab3e36f8fb6" checksum = "8521a1b57e76b1ec69af7599e75e38e7b7fad6610f037db8c79b127201b5d119"
[[package]] [[package]]
name = "lock_api" name = "lock_api"
version = "0.4.4" version = "0.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0382880606dff6d15c9476c416d18690b72742aa7b605bb6dd6ec9030fbf07eb" checksum = "712a4d093c9976e24e7dbca41db895dabcbac38eb5f4045393d17a95bdfb1109"
dependencies = [ dependencies = [
"scopeguard", "scopeguard",
] ]
@ -253,9 +253,9 @@ dependencies = [
[[package]] [[package]]
name = "mio" name = "mio"
version = "0.7.11" version = "0.7.14"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf80d3e903b34e0bd7282b218398aec54e082c840d9baf8339e0080a0c542956" checksum = "8067b404fe97c70829f082dec8bcf4f71225d7eaea1d8645349cb76fa06205cc"
dependencies = [ dependencies = [
"libc", "libc",
"log", "log",
@ -303,9 +303,9 @@ dependencies = [
[[package]] [[package]]
name = "parking_lot" name = "parking_lot"
version = "0.11.1" version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d7744ac029df22dca6284efe4e898991d28e3085c706c972bcd7da4a27a15eb" checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99"
dependencies = [ dependencies = [
"instant", "instant",
"lock_api", "lock_api",
@ -314,9 +314,9 @@ dependencies = [
[[package]] [[package]]
name = "parking_lot_core" name = "parking_lot_core"
version = "0.8.3" version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fa7a782938e745763fe6907fc6ba86946d72f49fe7e21de074e08128a99fb018" checksum = "d76e8e1493bcac0d2766c42737f34458f1c8c50c0d23bcb24ea953affb273216"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"instant", "instant",
@ -361,18 +361,18 @@ dependencies = [
[[package]] [[package]]
name = "proc-macro2" name = "proc-macro2"
version = "1.0.27" version = "1.0.32"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0d8caf72986c1a598726adc988bb5984792ef84f5ee5aa50209145ee8077038" checksum = "ba508cc11742c0dc5c1659771673afbab7a0efab23aa17e854cbab0837ed0b43"
dependencies = [ dependencies = [
"unicode-xid", "unicode-xid",
] ]
[[package]] [[package]]
name = "quote" name = "quote"
version = "1.0.9" version = "1.0.10"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c3d0b9745dc2debf507c8422de05d7226cc1f0644216dfdfead988f9b1ab32a7" checksum = "38bc8cc6a5f2e3655e0899c1b848643b2562f853f114bfec7be120678e3ace05"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
] ]
@ -404,9 +404,9 @@ dependencies = [
[[package]] [[package]]
name = "redox_syscall" name = "redox_syscall"
version = "0.2.8" version = "0.2.10"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "742739e41cd49414de871ea5e549afb7e2a3ac77b589bcbebe8c82fab37147fc" checksum = "8383f39639269cde97d255a32bdb68c047337295414940c68bdd30c2e13203ff"
dependencies = [ dependencies = [
"bitflags", "bitflags",
] ]
@ -502,9 +502,9 @@ dependencies = [
[[package]] [[package]]
name = "smallvec" name = "smallvec"
version = "1.6.1" version = "1.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fe0f37c9e8f3c5a4a66ad655a93c74daac4ad00c441533bf5c6e7990bb42604e" checksum = "1ecab6c735a6bb4139c0caafd0cc3635748bbb3acf4550e8138122099251f309"
[[package]] [[package]]
name = "strsim" name = "strsim"
@ -514,9 +514,9 @@ checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a"
[[package]] [[package]]
name = "structopt" name = "structopt"
version = "0.3.21" version = "0.3.25"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5277acd7ee46e63e5168a80734c9f6ee81b1367a7d8772a2d765df2a3705d28c" checksum = "40b9788f4202aa75c240ecc9c15c65185e6a39ccdeb0fd5d008b98825464c87c"
dependencies = [ dependencies = [
"clap", "clap",
"lazy_static", "lazy_static",
@ -525,9 +525,9 @@ dependencies = [
[[package]] [[package]]
name = "structopt-derive" name = "structopt-derive"
version = "0.4.14" version = "0.4.18"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5ba9cdfda491b814720b6b06e0cac513d922fc407582032e8706e9f137976f90" checksum = "dcb5ae327f9cc13b68763b5749770cb9e048a99bd9dfdfa58d0cf05d5f64afe0"
dependencies = [ dependencies = [
"heck", "heck",
"proc-macro-error", "proc-macro-error",
@ -538,9 +538,9 @@ dependencies = [
[[package]] [[package]]
name = "syn" name = "syn"
version = "1.0.73" version = "1.0.82"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f71489ff30030d2ae598524f61326b902466f72a0fb1a8564c001cc63425bcc7" checksum = "8daf5dd0bb60cbd4137b1b587d2fc0ae729bc07cf01cd70b36a1ed5ade3b9d59"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
@ -573,15 +573,15 @@ checksum = "56dee185309b50d1f11bfedef0fe6d036842e3fb77413abef29f8f8d1c5d4c1c"
[[package]] [[package]]
name = "unicode-segmentation" name = "unicode-segmentation"
version = "1.7.1" version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bb0d2e7be6ae3a5fa87eed5fb451aff96f2573d2694942e40543ae0bbe19c796" checksum = "8895849a949e7845e06bd6dc1aa51731a103c42707010a5b591c0038fb73385b"
[[package]] [[package]]
name = "unicode-width" name = "unicode-width"
version = "0.1.8" version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9337591893a19b88d8d87f2cec1e73fad5cdfd10e5a6f349f498ad6ea2ffb1e3" checksum = "3ed742d4ea2bd1176e236172c8429aaf54486e7ac098db29ffe6529e0ce50973"
[[package]] [[package]]
name = "unicode-xid" name = "unicode-xid"

View File

@ -10,15 +10,21 @@ Currently, we support three CPU threading API as devices:
see [rayon_stream.rs](src/rayon_stream.rs) see [rayon_stream.rs](src/rayon_stream.rs)
* [Crossbeam](https://github.com/crossbeam-rs/crossbeam) - Parallel with partitions per thread, * [Crossbeam](https://github.com/crossbeam-rs/crossbeam) - Parallel with partitions per thread,
see [crossbeam_stream.rs](src/crossbeam_stream.rs) see [crossbeam_stream.rs](src/crossbeam_stream.rs)
* Arc - Parallel with `Vec` per thread (static partitions) wrapped in `Mutex` contained in `Arc`s,
see [crossbeam_stream.rs](src/arc_stream.rs)
* Unsafe - Parallel with unsafe pointer per thread (static partitions) to `Vec`,
see [crossbeam_stream.rs](src/unsafe_stream.rs)
In addition, this implementation also supports the following extra flags: In addition, this implementation also supports the following extra flags:
****
``` ```
--init Initialise each benchmark array at allocation time on the main thread --init Initialise each benchmark array at allocation time on the main thread
--malloc Use libc malloc instead of the Rust's allocator for benchmark array allocation --malloc Use libc malloc instead of the Rust's allocator for benchmark array allocation
--pin Pin threads to distinct cores, this has NO effect in Rayon devices --pin Pin threads to distinct cores, this has NO effect in Rayon devices
``` ```
Max thread count is controlled by the environment variable `BABELSTREAM_NUM_THREADS` which is compatible for all devices (avoid setting `RAYON_NUM_THREADS`, the implementation will issue a warning if this happened).
There is an ongoing investigation on potential performance issues under NUMA situations. As part of There is an ongoing investigation on potential performance issues under NUMA situations. As part of
the experiment, this implementation made use of the the experiment, this implementation made use of the
provisional [Allocator traits](https://github.com/rust-lang/rust/issues/32838) which requires rust provisional [Allocator traits](https://github.com/rust-lang/rust/issues/32838) which requires rust

View File

@ -54,7 +54,7 @@ use_field_init_shorthand = false
force_explicit_abi = true force_explicit_abi = true
condense_wildcard_suffixes = false condense_wildcard_suffixes = false
color = "Auto" color = "Auto"
required_version = "1.4.36" required_version = "1.4.38"
unstable_features = false unstable_features = false
disable_all_formatting = false disable_all_formatting = false
skip_children = false skip_children = false

View File

@ -0,0 +1,254 @@
use std::iter::Sum;
use std::sync::{Arc, Mutex};
use self::core_affinity::CoreId;
use crate::stream::{AllocatorType, ArrayType, RustStream, StreamData};
struct ArcHeapData<T: ArrayType, A: AllocatorType> {
a_chunks: Vec<Arc<Mutex<Vec<T, A>>>>,
b_chunks: Vec<Arc<Mutex<Vec<T, A>>>>,
c_chunks: Vec<Arc<Mutex<Vec<T, A>>>>,
}
pub struct ArcDevice<T: ArrayType, A: AllocatorType> {
pub(crate) ncore: usize,
pub(crate) pin: bool,
pub(crate) core_ids: Vec<CoreId>,
data: ArcHeapData<T, A>,
}
impl<T: ArrayType, A: AllocatorType> ArcDevice<T, A> {
pub fn new(ncore: usize, pin: bool, alloc: A) -> Self {
let mut core_ids = match core_affinity::get_core_ids() {
Some(xs) => xs,
None => {
colour::e_red_ln!("Cannot enumerate cores, pinning will not work if enabled");
(0..ncore).map(|i| CoreId { id: i }).collect()
}
};
core_ids.resize(ncore, core_ids[0]);
let lift =
|| (0..ncore).map(|_| return Arc::new(Mutex::new(Vec::new_in(alloc)))).collect::<Vec<_>>();
let data = ArcHeapData { a_chunks: lift(), b_chunks: lift(), c_chunks: lift() };
ArcDevice { ncore, pin, core_ids, data }
}
pub fn ref_a(&self, t: usize) -> Arc<Mutex<Vec<T, A>>> { self.data.a_chunks[t].clone() }
pub fn ref_b(&self, t: usize) -> Arc<Mutex<Vec<T, A>>> { self.data.b_chunks[t].clone() }
pub fn ref_c(&self, t: usize) -> Arc<Mutex<Vec<T, A>>> { self.data.c_chunks[t].clone() }
// divide the length by the number of cores, the last core gets less work if it does not divide
fn chunk_size(&self, len: usize, t: usize) -> usize {
assert!(t < self.ncore);
let chunk = (len as f64 / self.ncore as f64).ceil() as usize;
if t == self.ncore - 1 {
len - (t * chunk)
} else {
chunk
}
}
}
extern crate core_affinity;
// Arc+Mutex threaded version, it should be semantically equal to the single threaded version
impl<T: 'static + ArrayType + Sync + Send + Sum, A: AllocatorType + Sync + Send + 'static>
RustStream<T> for StreamData<T, ArcDevice<T, A>, A>
{
fn init_arrays(&mut self) {
let init = self.init;
let pin = self.device.pin;
(0..self.device.ncore)
.map(&|t| {
let ref_a = self.device.ref_a(t);
let ref_b = self.device.ref_b(t);
let ref_c = self.device.ref_c(t);
let core = self.device.core_ids[t];
let n = self.device.chunk_size(self.size, t);
std::thread::spawn(move || {
if pin {
core_affinity::set_for_current(core);
}
ref_a.lock().unwrap().resize(n, init.0);
ref_b.lock().unwrap().resize(n, init.1);
ref_c.lock().unwrap().resize(n, init.2);
})
})
.collect::<Vec<_>>()
.into_iter()
.for_each(|t| t.join().unwrap());
}
fn read_arrays(&mut self) {
let range = self.size;
let unlift = |drain: &mut Vec<T, A>, source: &Vec<Arc<Mutex<Vec<T, A>>>>| {
let xs =
source.into_iter().flat_map(|x| x.lock().unwrap().clone().into_iter()).collect::<Vec<_>>();
for i in 0..range {
drain[i] = xs[i];
}
};
unlift(&mut self.a, &self.device.data.a_chunks);
unlift(&mut self.b, &self.device.data.b_chunks);
unlift(&mut self.c, &self.device.data.c_chunks);
}
fn copy(&mut self) {
let pin = self.device.pin;
(0..self.device.ncore)
.map(move |t| {
let ref_a = self.device.ref_a(t);
let ref_c = self.device.ref_c(t);
let core = self.device.core_ids[t];
let n = self.device.chunk_size(self.size, t);
std::thread::spawn(move || {
if pin {
core_affinity::set_for_current(core);
}
let a = ref_a.lock().unwrap();
let mut c = ref_c.lock().unwrap();
for i in 0..n {
c[i] = a[i];
}
})
})
.collect::<Vec<_>>()
.into_iter()
.for_each(|t| t.join().unwrap());
}
fn mul(&mut self) {
let scalar = self.scalar;
let pin = self.device.pin;
(0..self.device.ncore)
.map(move |t| {
let ref_b = self.device.ref_b(t);
let ref_c = self.device.ref_c(t);
let core = self.device.core_ids[t];
let n = self.device.chunk_size(self.size, t);
std::thread::spawn(move || {
if pin {
core_affinity::set_for_current(core);
}
let mut b = ref_b.lock().unwrap();
let c = ref_c.lock().unwrap();
for i in 0..n {
b[i] = scalar * c[i];
}
})
})
.collect::<Vec<_>>()
.into_iter()
.for_each(|t| t.join().unwrap());
}
fn add(&mut self) {
let pin = self.device.pin;
(0..self.device.ncore)
.map(&|t| {
let ref_a = self.device.ref_a(t);
let ref_b = self.device.ref_b(t);
let ref_c = self.device.ref_c(t);
let core = self.device.core_ids[t];
let n = self.device.chunk_size(self.size, t);
std::thread::spawn(move || {
if pin {
core_affinity::set_for_current(core);
}
let a = ref_a.lock().unwrap();
let b = ref_b.lock().unwrap();
let mut c = ref_c.lock().unwrap();
for i in 0..n {
c[i] = a[i] + b[i];
}
})
})
.collect::<Vec<_>>()
.into_iter()
.for_each(|t| t.join().unwrap());
}
fn triad(&mut self) {
let scalar = self.scalar;
let pin = self.device.pin;
(0..self.device.ncore)
.map(&|t| {
let ref_a = self.device.ref_a(t);
let ref_b = self.device.ref_b(t);
let ref_c = self.device.ref_c(t);
let core = self.device.core_ids[t];
let n = self.device.chunk_size(self.size, t);
std::thread::spawn(move || {
if pin {
core_affinity::set_for_current(core);
}
let mut a = ref_a.lock().unwrap();
let b = ref_b.lock().unwrap();
let c = ref_c.lock().unwrap();
for i in 0..n {
a[i] = b[i] + scalar * c[i]
}
})
})
.collect::<Vec<_>>()
.into_iter()
.for_each(|t| t.join().unwrap());
}
fn nstream(&mut self) {
let scalar = self.scalar;
let pin = self.device.pin;
(0..self.device.ncore)
.map(&|t| {
let ref_a = self.device.ref_a(t);
let ref_b = self.device.ref_b(t);
let ref_c = self.device.ref_c(t);
let core = self.device.core_ids[t];
let n = self.device.chunk_size(self.size, t);
std::thread::spawn(move || {
if pin {
core_affinity::set_for_current(core);
}
let mut a = ref_a.lock().unwrap();
let b = ref_b.lock().unwrap();
let c = ref_c.lock().unwrap();
for i in 0..n {
a[i] += b[i] + scalar * c[i]
}
})
})
.collect::<Vec<_>>()
.into_iter()
.for_each(|t| t.join().unwrap());
}
fn dot(&mut self) -> T {
let pin = self.device.pin;
(0..self.device.ncore)
.map(&|t| {
let ref_a = self.device.ref_a(t);
let ref_b = self.device.ref_b(t);
let core = self.device.core_ids[t];
let n = self.device.chunk_size(self.size, t);
std::thread::spawn(move || {
if pin {
core_affinity::set_for_current(core);
}
let a = ref_a.lock().unwrap();
let b = ref_b.lock().unwrap();
let mut p = T::default();
for i in 0..n {
p += a[i] * b[i];
}
p
})
})
.collect::<Vec<_>>()
.into_iter()
.map(|t| t.join().unwrap())
.sum()
}
}

View File

@ -6,13 +6,13 @@ use crossbeam::thread;
use self::core_affinity::CoreId; use self::core_affinity::CoreId;
use crate::stream::{AllocatorType, ArrayType, RustStream, StreamData}; use crate::stream::{AllocatorType, ArrayType, RustStream, StreamData};
pub struct ThreadedDevice { pub struct CrossbeamDevice {
pub(crate) ncore: usize, pub(crate) ncore: usize,
pub(crate) pin: bool, pub(crate) pin: bool,
pub(crate) core_ids: Vec<CoreId>, pub(crate) core_ids: Vec<CoreId>,
} }
impl ThreadedDevice { impl CrossbeamDevice {
pub fn new(ncore: usize, pin: bool) -> Self { pub fn new(ncore: usize, pin: bool) -> Self {
let mut core_ids = match core_affinity::get_core_ids() { let mut core_ids = match core_affinity::get_core_ids() {
Some(xs) => xs, Some(xs) => xs,
@ -22,15 +22,13 @@ impl ThreadedDevice {
} }
}; };
core_ids.resize(ncore, core_ids[0]); core_ids.resize(ncore, core_ids[0]);
ThreadedDevice { ncore, pin, core_ids } CrossbeamDevice { ncore, pin, core_ids }
} }
} }
impl ThreadedDevice { impl CrossbeamDevice {
// divide the length by the number of cores, the last core gets less work if it does not divide // divide the length by the number of cores, the last core gets less work if it does not divide
fn chunk_size(&self, len: usize) -> usize { fn chunk_size(&self, len: usize) -> usize { (len as f64 / self.ncore as f64).ceil() as usize }
(len as f64 / self.ncore as f64).ceil() as usize
}
// make a mutable chunk from the vec // make a mutable chunk from the vec
fn mk_mut_chunks<'a, T, A: AllocatorType>(&self, xs: &'a mut Vec<T, A>) -> ChunksMut<'a, T> { fn mk_mut_chunks<'a, T, A: AllocatorType>(&self, xs: &'a mut Vec<T, A>) -> ChunksMut<'a, T> {
@ -48,7 +46,7 @@ extern crate core_affinity;
// Crossbeam threaded version, it should be semantically equal to the single threaded version // Crossbeam threaded version, it should be semantically equal to the single threaded version
impl<T: ArrayType + Sync + Send + Sum, A: AllocatorType + Sync + Send> RustStream<T> impl<T: ArrayType + Sync + Send + Sum, A: AllocatorType + Sync + Send> RustStream<T>
for StreamData<T, ThreadedDevice, A> for StreamData<T, CrossbeamDevice, A>
{ {
fn init_arrays(&mut self) { fn init_arrays(&mut self) {
thread::scope(|s| { thread::scope(|s| {

View File

@ -2,6 +2,7 @@
#![feature(vec_into_raw_parts)] #![feature(vec_into_raw_parts)]
use std::alloc::System; use std::alloc::System;
use std::env;
use std::fmt::{Debug, Display}; use std::fmt::{Debug, Display};
use std::iter::Sum; use std::iter::Sum;
use std::mem::size_of; use std::mem::size_of;
@ -11,15 +12,19 @@ use num_traits::abs;
use structopt::StructOpt; use structopt::StructOpt;
use tabular::{Row, Table}; use tabular::{Row, Table};
use crate::crossbeam_stream::ThreadedDevice; use crate::arc_stream::ArcDevice;
use crate::crossbeam_stream::CrossbeamDevice;
use crate::plain_stream::SerialDevice; use crate::plain_stream::SerialDevice;
use crate::rayon_stream::RayonDevice; use crate::rayon_stream::RayonDevice;
use crate::stream::{AllocatorType, ArrayType, RustStream, StreamData}; use crate::stream::{AllocatorType, ArrayType, RustStream, StreamData};
use crate::unsafe_stream::UnsafeDevice;
mod arc_stream;
mod crossbeam_stream; mod crossbeam_stream;
mod plain_stream; mod plain_stream;
mod rayon_stream; mod rayon_stream;
mod stream; mod stream;
mod unsafe_stream;
#[derive(Debug, StructOpt)] #[derive(Debug, StructOpt)]
struct Options { struct Options {
@ -119,9 +124,7 @@ fn check_solution<T: ArrayType + Display + Sum + Into<f64>, D, A: AllocatorType>
fn run_cpu<T: ArrayType + Sync + Send + Sum + Into<f64> + Display, D, A: AllocatorType>( fn run_cpu<T: ArrayType + Sync + Send + Sum + Into<f64> + Display, D, A: AllocatorType>(
option: &Options, mut stream: StreamData<T, D, A>, option: &Options, mut stream: StreamData<T, D, A>,
) -> bool ) -> bool
where where StreamData<T, D, A>: RustStream<T> {
StreamData<T, D, A>: RustStream<T>,
{
let benchmark = match (option.nstream_only, option.triad_only) { let benchmark = match (option.nstream_only, option.triad_only) {
(true, false) => Benchmark::NStream, (true, false) => Benchmark::NStream,
(false, true) => Benchmark::Triad, (false, true) => Benchmark::Triad,
@ -175,6 +178,7 @@ where
let tabulate = |xs: &Vec<Duration>, name: &str, t_size: usize| -> Vec<(&str, String)> { let tabulate = |xs: &Vec<Duration>, name: &str, t_size: usize| -> Vec<(&str, String)> {
let tail = &xs[1..]; // tail only let tail = &xs[1..]; // tail only
// do stats // do stats
let max = tail.iter().max().map(|d| d.as_secs_f64()); let max = tail.iter().max().map(|d| d.as_secs_f64());
let min = tail.iter().min().map(|d| d.as_secs_f64()); let min = tail.iter().min().map(|d| d.as_secs_f64());
@ -234,6 +238,7 @@ where
let solutions_correct = match benchmark { let solutions_correct = match benchmark {
Benchmark::All => { Benchmark::All => {
let (results, sum) = stream.run_all(option.numtimes); let (results, sum) = stream.run_all(option.numtimes);
stream.read_arrays();
let correct = check_solution(benchmark, option.numtimes, &stream, Some(sum)); let correct = check_solution(benchmark, option.numtimes, &stream, Some(sum));
tabulate_all(vec![ tabulate_all(vec![
tabulate(&results.copy, "Copy", 2 * array_bytes), tabulate(&results.copy, "Copy", 2 * array_bytes),
@ -246,12 +251,14 @@ where
} }
Benchmark::NStream => { Benchmark::NStream => {
let results = stream.run_nstream(option.numtimes); let results = stream.run_nstream(option.numtimes);
stream.read_arrays();
let correct = check_solution(benchmark, option.numtimes, &stream, None); let correct = check_solution(benchmark, option.numtimes, &stream, None);
tabulate_all(vec![tabulate(&results, "Nstream", 4 * array_bytes)]); tabulate_all(vec![tabulate(&results, "Nstream", 4 * array_bytes)]);
correct correct
} }
Benchmark::Triad => { Benchmark::Triad => {
let results = stream.run_triad(option.numtimes); let results = stream.run_triad(option.numtimes);
stream.read_arrays();
let correct = check_solution(benchmark, option.numtimes, &stream, None); let correct = check_solution(benchmark, option.numtimes, &stream, None);
let total_bytes = 3 * array_bytes * option.numtimes; let total_bytes = 3 * array_bytes * option.numtimes;
let bandwidth = giga_scale * (total_bytes as f64 / results.as_secs_f64()); let bandwidth = giga_scale * (total_bytes as f64 / results.as_secs_f64());
@ -260,7 +267,7 @@ where
correct correct
} }
}; };
&stream.clean_up(); stream.clean_up();
solutions_correct solutions_correct
} }
@ -274,139 +281,138 @@ static START_SCALAR: f32 = 0.4;
static FLOAT_INIT_SCALAR: f32 = START_SCALAR; static FLOAT_INIT_SCALAR: f32 = START_SCALAR;
static FLOAT_INIT: (f32, f32, f32) = (START_A, START_B, START_C); static FLOAT_INIT: (f32, f32, f32) = (START_A, START_B, START_C);
static DOUBLE_START_SCALAR: f64 = START_SCALAR as f64; static DOUBLE_INIT_SCALAR: f64 = START_SCALAR as f64;
static DOUBLE_INIT: (f64, f64, f64) = (START_A as f64, START_B as f64, START_C as f64); static DOUBLE_INIT: (f64, f64, f64) = (START_A as f64, START_B as f64, START_C as f64);
pub fn run(args: &Vec<String>) -> bool { pub fn run(args: &Vec<String>) -> bool {
let opt: Options = Options::from_iter(args);
let options: Options = Options::from_iter(args); if opt.numtimes < 2 {
if options.numtimes < 2 {
panic!("numtimes must be >= 2") panic!("numtimes must be >= 2")
} }
let alloc = System; let alloc = System;
let alloc_name = if options.malloc { "libc-malloc" } else { "rust-system" }; let alloc_name = if opt.malloc { "libc-malloc" } else { "rust-system" };
fn mk_data<T: ArrayType, D, A: AllocatorType>(
opt: &Options, init: (T, T, T), scalar: T, dev: D, alloc: A,
) -> StreamData<T, D, A> {
StreamData::new_in(opt.arraysize, scalar, init, dev, alloc, opt.malloc, opt.init)
}
let num_thread_key = "BABELSTREAM_NUM_THREADS";
let max_ncores = num_cpus::get();
let ncores = match env::var(num_thread_key) {
Ok(v) => match v.parse::<i64>() {
Err(bad) => {
colour::e_yellow_ln!(
"Cannot parse {} (reason: {}), defaulting to {}",
bad,
num_thread_key,
max_ncores
);
max_ncores
}
Ok(n) if n <= 0 || n > max_ncores as i64 => {
println!("{} out of bound ({}), defaulting to {}", num_thread_key, n, max_ncores);
max_ncores
}
Ok(n) => n as usize,
},
Err(_) => {
println!("{} not set, defaulting to max ({})", num_thread_key, max_ncores);
max_ncores
}
};
let rayon_device = &|| { let rayon_device = &|| {
let dev = RayonDevice { pool: rayon::ThreadPoolBuilder::default().build().unwrap() }; let rayon_num_thread_key = "RAYON_NUM_THREADS";
if !options.csv { if env::var(rayon_num_thread_key).is_ok() {
colour::e_yellow_ln!("{} is ignored, set {} instead", rayon_num_thread_key, num_thread_key)
}
let dev = RayonDevice {
pool: rayon::ThreadPoolBuilder::default().num_threads(ncores).build().unwrap(),
};
if !opt.csv {
println!("Using {} thread(s), alloc={}", dev.pool.current_num_threads(), alloc_name); println!("Using {} thread(s), alloc={}", dev.pool.current_num_threads(), alloc_name);
if options.pin { if opt.pin {
colour::e_yellow_ln!("Pinning threads have no effect on Rayon!") colour::e_yellow_ln!("Pinning threads have no effect on Rayon!")
} }
} }
if options.float { if opt.float {
run_cpu( run_cpu(&opt, mk_data(&opt, FLOAT_INIT, FLOAT_INIT_SCALAR, dev, alloc))
&options,
StreamData::new_in(
options.arraysize,
FLOAT_INIT_SCALAR,
FLOAT_INIT,
dev,
alloc,
options.malloc,
options.init,
),
)
} else { } else {
run_cpu( run_cpu(&opt, mk_data(&opt, DOUBLE_INIT, DOUBLE_INIT_SCALAR, dev, alloc))
&options, }
StreamData::new_in( };
options.arraysize,
DOUBLE_START_SCALAR, let arc_device = &|| {
DOUBLE_INIT, if !opt.csv {
dev, println!("Using {} thread, pin={}, alloc={}", ncores, opt.pin, alloc_name);
alloc, }
options.malloc, if opt.float {
options.init, let dev = ArcDevice::<f32, _>::new(ncores, opt.pin, alloc);
), run_cpu(&opt, mk_data(&opt, FLOAT_INIT, FLOAT_INIT_SCALAR, dev, alloc))
) } else {
let dev = ArcDevice::<f64, _>::new(ncores, opt.pin, alloc);
run_cpu(&opt, mk_data(&opt, DOUBLE_INIT, DOUBLE_INIT_SCALAR, dev, alloc))
}
};
let unsafe_device = &|| {
if !opt.csv {
println!("Using {} thread, pin={}, alloc={}", ncores, opt.pin, alloc_name);
}
if opt.float {
let dev = UnsafeDevice::<f32>::new(ncores, opt.pin);
run_cpu(&opt, mk_data(&opt, FLOAT_INIT, FLOAT_INIT_SCALAR, dev, alloc))
} else {
let dev = UnsafeDevice::<f64>::new(ncores, opt.pin);
run_cpu(&opt, mk_data(&opt, DOUBLE_INIT, DOUBLE_INIT_SCALAR, dev, alloc))
} }
}; };
let crossbeam_device = &|| { let crossbeam_device = &|| {
let ncores = num_cpus::get(); let dev = CrossbeamDevice::new(ncores, opt.pin);
let dev = ThreadedDevice::new(ncores, options.pin); if !opt.csv {
if !options.csv { println!("Using {} thread(s), pin={}, alloc={}", ncores, opt.pin, alloc_name)
println!("Using {} thread(s), pin={}, alloc={}", ncores, options.pin, alloc_name)
} }
if options.float { if opt.float {
run_cpu( run_cpu(&opt, mk_data(&opt, FLOAT_INIT, FLOAT_INIT_SCALAR, dev, alloc))
&options,
StreamData::new_in(
options.arraysize,
FLOAT_INIT_SCALAR,
FLOAT_INIT,
dev,
alloc,
options.malloc,
options.init,
),
)
} else { } else {
run_cpu( run_cpu(&opt, mk_data(&opt, DOUBLE_INIT, DOUBLE_INIT_SCALAR, dev, alloc))
&options,
StreamData::new_in(
options.arraysize,
DOUBLE_START_SCALAR,
DOUBLE_INIT,
dev,
alloc,
options.malloc,
options.init,
),
)
} }
}; };
let st_device = &|| { let st_device = &|| {
let dev = SerialDevice { pin: options.pin }; let dev = SerialDevice { pin: opt.pin };
if !options.csv { if !opt.csv {
println!("Using 1 thread, pin={}, alloc={}", options.pin, alloc_name); println!("Using 1 thread, pin={}, alloc={}", opt.pin, alloc_name);
} }
if options.float { if opt.float {
run_cpu( run_cpu(&opt, mk_data(&opt, FLOAT_INIT, FLOAT_INIT_SCALAR, dev, alloc))
&options,
StreamData::new_in(
options.arraysize,
FLOAT_INIT_SCALAR,
FLOAT_INIT,
dev,
alloc,
options.malloc,
options.init,
),
)
} else { } else {
run_cpu( run_cpu(&opt, mk_data(&opt, DOUBLE_INIT, DOUBLE_INIT_SCALAR, dev, alloc))
&options,
StreamData::new_in(
options.arraysize,
DOUBLE_START_SCALAR,
DOUBLE_INIT,
dev,
alloc,
options.malloc,
options.init,
),
)
} }
}; };
let devices: Vec<(String, &'_ dyn Fn() -> bool)> = vec![ let devices: Vec<(String, &'_ dyn Fn() -> bool)> = vec![
("CPU (Rayon)".to_string(), rayon_device),
(format!("CPU (Crossbeam, pinning={})", options.pin), crossbeam_device),
("CPU (Single threaded)".to_string(), st_device), ("CPU (Single threaded)".to_string(), st_device),
("CPU (Rayon)".to_string(), rayon_device),
(format!("CPU (Arc, pinning={})", opt.pin), arc_device),
(format!("CPU (Unsafe, pinning={})", opt.pin), unsafe_device),
(format!("CPU (Crossbeam, pinning={})", opt.pin), crossbeam_device),
]; ];
if options.list { if opt.list {
devices.iter().enumerate().for_each(|(i, (name, _))| { devices.iter().enumerate().for_each(|(i, (name, _))| {
println!("[{}] {}", i, name); println!("[{}] {}", i, name);
}); });
true true
} else { } else {
match devices.get(options.device) { match devices.get(opt.device) {
Some((name, run)) => { Some((name, run)) => {
if !&options.csv { if !&opt.csv {
println!( println!(
"BabelStream\n\ "BabelStream\n\
Version: {}\n\ Version: {}\n\
@ -414,14 +420,14 @@ pub fn run(args: &Vec<String>) -> bool {
VERSION.unwrap_or("unknown"), VERSION.unwrap_or("unknown"),
name name
); );
if options.init { if opt.init {
println!("Initialising arrays on main thread"); println!("Initialising arrays on main thread");
} }
} }
run() run()
} }
None => { None => {
eprintln!("Device index {} not available", options.device); eprintln!("Device index {} not available", opt.device);
false false
} }
} }

View File

@ -124,6 +124,7 @@ impl<T: Default + Clone, D, A: AllocatorType> StreamData<T, D, A> {
pub trait RustStream<T: Default> { pub trait RustStream<T: Default> {
fn init_arrays(&mut self); fn init_arrays(&mut self);
fn read_arrays(&mut self) {} // default to no-op as most impl. doesn't need this
fn copy(&mut self); fn copy(&mut self);
fn mul(&mut self); fn mul(&mut self);
fn add(&mut self); fn add(&mut self);

View File

@ -0,0 +1,266 @@
extern crate core_affinity;
use std::alloc::Allocator;
use std::iter::Sum;
use std::ops::Range;
use crate::stream::{AllocatorType, ArrayType, RustStream, StreamData};
use self::core_affinity::CoreId;
#[derive(Debug, Copy, Clone)]
struct UnsafeData<T>(*mut T, usize);
impl<T: ArrayType> UnsafeData<T> {
fn empty() -> UnsafeData<T> { UnsafeData(([] as [T; 0]).as_mut_ptr(), 0) }
fn new<A: Allocator>(xs: &mut Vec<T, A>) -> UnsafeData<T> {
UnsafeData(xs.as_mut_ptr(), xs.len())
}
fn get_slice(&self) -> &mut [T] { unsafe { std::slice::from_raw_parts_mut(self.0, self.1) } }
}
unsafe impl<T> Send for UnsafeData<T> {}
unsafe impl<T> Sync for UnsafeData<T> {}
#[derive(Debug, Copy, Clone)]
struct UnsafeRefs<T> {
a: UnsafeData<T>,
b: UnsafeData<T>,
c: UnsafeData<T>,
}
unsafe impl<T> Send for UnsafeRefs<T> {}
unsafe impl<T> Sync for UnsafeRefs<T> {}
pub struct UnsafeDevice<T: ArrayType> {
pub(crate) ncore: usize,
pub(crate) pin: bool,
pub(crate) core_ids: Vec<CoreId>,
data: UnsafeRefs<T>,
}
impl<T: ArrayType> UnsafeDevice<T> {
pub fn new(ncore: usize, pin: bool) -> Self {
let mut core_ids = match core_affinity::get_core_ids() {
Some(xs) => xs,
None => {
colour::e_red_ln!("Cannot enumerate cores, pinning will not work if enabled");
(0..ncore).map(|i| CoreId { id: i }).collect()
}
};
core_ids.resize(ncore, core_ids[0]);
UnsafeDevice {
ncore,
pin,
core_ids,
data: UnsafeRefs { a: UnsafeData::empty(), b: UnsafeData::empty(), c: UnsafeData::empty() },
}
}
fn thread_ranges(&self, len: usize) -> Vec<(usize, Range<usize>)> {
let chunk = (len as f64 / self.ncore as f64).ceil() as usize;
(0..self.ncore)
.map(|t| {
(t, if t == self.ncore - 1 { (t * chunk)..len } else { (t * chunk)..((t + 1) * chunk) })
})
.collect::<Vec<_>>()
}
}
// Unsafe threaded version, it should be semantically equal to the single threaded version
impl<T: 'static + ArrayType + Sync + Send + Sum, A: AllocatorType + Sync + Send> RustStream<T>
for StreamData<T, UnsafeDevice<T>, A>
{
fn init_arrays(&mut self) {
self.device.data.a = UnsafeData::new(&mut self.a);
self.device.data.b = UnsafeData::new(&mut self.b);
self.device.data.c = UnsafeData::new(&mut self.c);
let init = self.init;
let pin = self.device.pin;
let data = self.device.data;
self
.device
.thread_ranges(self.size)
.into_iter()
.map(|(t, r)| {
let core = self.device.core_ids[t];
std::thread::spawn(move || {
if pin {
core_affinity::set_for_current(core);
}
let a = data.a.get_slice();
let b = data.b.get_slice();
let c = data.c.get_slice();
for i in r {
a[i] = init.0;
b[i] = init.1;
c[i] = init.2;
}
})
})
.collect::<Vec<_>>()
.into_iter()
.for_each(|t| t.join().unwrap());
}
fn copy(&mut self) {
let pin = self.device.pin;
let data = self.device.data;
self
.device
.thread_ranges(self.size)
.into_iter()
.map(|(t, r)| {
let core = self.device.core_ids[t];
std::thread::spawn(move || {
if pin {
core_affinity::set_for_current(core);
}
let a = data.a.get_slice();
let c = data.c.get_slice();
for i in r {
c[i] = a[i];
}
})
})
.collect::<Vec<_>>()
.into_iter()
.for_each(|t| t.join().unwrap());
}
fn mul(&mut self) {
let scalar = self.scalar;
let pin = self.device.pin;
let data = self.device.data;
self
.device
.thread_ranges(self.size)
.into_iter()
.map(|(t, r)| {
let core = self.device.core_ids[t];
std::thread::spawn(move || {
if pin {
core_affinity::set_for_current(core);
}
let b = data.b.get_slice();
let c = data.c.get_slice();
for i in r {
b[i] = scalar * c[i];
}
})
})
.collect::<Vec<_>>()
.into_iter()
.for_each(|t| t.join().unwrap());
}
fn add(&mut self) {
let pin = self.device.pin;
let data = self.device.data;
self
.device
.thread_ranges(self.size)
.into_iter()
.map(|(t, r)| {
let core = self.device.core_ids[t];
std::thread::spawn(move || {
if pin {
core_affinity::set_for_current(core);
}
let a = data.a.get_slice();
let b = data.b.get_slice();
let c = data.c.get_slice();
for i in r {
c[i] = a[i] + b[i];
}
})
})
.collect::<Vec<_>>()
.into_iter()
.for_each(|t| t.join().unwrap());
}
fn triad(&mut self) {
let scalar = self.scalar;
let pin = self.device.pin;
let data = self.device.data;
self
.device
.thread_ranges(self.size)
.into_iter()
.map(|(t, r)| {
let core = self.device.core_ids[t];
std::thread::spawn(move || {
if pin {
core_affinity::set_for_current(core);
}
let a = data.a.get_slice();
let b = data.b.get_slice();
let c = data.c.get_slice();
for i in r {
a[i] = b[i] + scalar * c[i]
}
})
})
.collect::<Vec<_>>()
.into_iter()
.for_each(|t| t.join().unwrap());
}
fn nstream(&mut self) {
let scalar = self.scalar;
let pin = self.device.pin;
let data = self.device.data;
self
.device
.thread_ranges(self.size)
.into_iter()
.map(|(t, r)| {
let core = self.device.core_ids[t];
std::thread::spawn(move || {
if pin {
core_affinity::set_for_current(core);
}
let a = data.a.get_slice();
let b = data.b.get_slice();
let c = data.c.get_slice();
for i in r {
a[i] += b[i] + scalar * c[i]
}
})
})
.collect::<Vec<_>>()
.into_iter()
.for_each(|t| t.join().unwrap());
}
fn dot(&mut self) -> T {
let pin = self.device.pin;
let data = self.device.data;
self
.device
.thread_ranges(self.size)
.into_iter()
.map(|(t, r)| {
let core = self.device.core_ids[t];
std::thread::spawn(move || {
if pin {
core_affinity::set_for_current(core);
}
let a = data.a.get_slice();
let b = data.b.get_slice();
let mut p = T::default();
for i in r {
p += a[i] * b[i];
}
p
})
})
.collect::<Vec<_>>()
.into_iter()
.map(|t| t.join().unwrap())
.sum()
}
}

View File

@ -2,7 +2,7 @@ use rstest::rstest;
#[rstest] #[rstest]
fn test_main( fn test_main(
#[values(0, 1, 2)] device: usize, // #[values(0, 1, 2, 3, 4)] device: usize, //
#[values("", "--pin")] pin: &str, // #[values("", "--pin")] pin: &str, //
#[values("", "--malloc")] malloc: &str, // #[values("", "--malloc")] malloc: &str, //
#[values("", "--init")] init: &str, // #[values("", "--init")] init: &str, //