Add Crossbeam implementation

Add rustfmt and use target-cpu=native
Add option for libc malloc, basic thread pinning, touch-free allocation
Split modules
This commit is contained in:
Tom Lin 2021-06-15 23:13:14 +01:00
parent c70a5da45b
commit fdb2c181cc
10 changed files with 1322 additions and 411 deletions

View File

@ -0,0 +1,2 @@
[build]
rustflags = ["-C", "target-cpu=native"]

277
rust-stream/Cargo.lock generated
View File

@ -1,12 +1,14 @@
# This file is automatically @generated by Cargo. # This file is automatically @generated by Cargo.
# It is not intended for manual editing. # It is not intended for manual editing.
version = 3
[[package]] [[package]]
name = "ansi_term" name = "ansi_term"
version = "0.11.0" version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ee49baf6cb617b853aa8d93bf420db2383fab46d314482ca2803b40d5fde979b" checksum = "ee49baf6cb617b853aa8d93bf420db2383fab46d314482ca2803b40d5fde979b"
dependencies = [ dependencies = [
"winapi", "winapi 0.3.9",
] ]
[[package]] [[package]]
@ -17,7 +19,7 @@ checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8"
dependencies = [ dependencies = [
"hermit-abi", "hermit-abi",
"libc", "libc",
"winapi", "winapi 0.3.9",
] ]
[[package]] [[package]]
@ -54,10 +56,45 @@ dependencies = [
] ]
[[package]] [[package]]
name = "crossbeam-channel" name = "colour"
version = "0.5.0" version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dca26ee1f8d361640700bde38b2c37d8c22b3ce2d360e1fc1c74ea4b0aa7d775" checksum = "a27e4532f26f510c24bb8477d963c0c3ef27e293c3b2c507cccb0536d493201a"
dependencies = [
"crossterm",
]
[[package]]
name = "core_affinity"
version = "0.5.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f8a03115cc34fb0d7c321dd154a3914b3ca082ccc5c11d91bf7117dbbe7171f"
dependencies = [
"kernel32-sys",
"libc",
"num_cpus",
"winapi 0.2.8",
]
[[package]]
name = "crossbeam"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4ae5588f6b3c3cb05239e90bd110f257254aecd01e4635400391aeae07497845"
dependencies = [
"cfg-if",
"crossbeam-channel",
"crossbeam-deque",
"crossbeam-epoch",
"crossbeam-queue",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-channel"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06ed27e177f16d65f0f0c22a213e17c696ace5dd64b14258b52f9417ccb52db4"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"crossbeam-utils", "crossbeam-utils",
@ -76,9 +113,9 @@ dependencies = [
[[package]] [[package]]
name = "crossbeam-epoch" name = "crossbeam-epoch"
version = "0.9.3" version = "0.9.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2584f639eb95fea8c798496315b297cf81b9b58b6d30ab066a75455333cf4b12" checksum = "4ec02e091aa634e2c3ada4a392989e7c3116673ef0ac5b72232439094d73b7fd"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"crossbeam-utils", "crossbeam-utils",
@ -88,16 +125,50 @@ dependencies = [
] ]
[[package]] [[package]]
name = "crossbeam-utils" name = "crossbeam-queue"
version = "0.8.3" version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7e9d99fa91428effe99c5c6d4634cdeba32b8cf784fc428a2a687f61a952c49" checksum = "9b10ddc024425c88c2ad148c1b0fd53f4c6d38db9697c9f1588381212fa657c9"
dependencies = [
"cfg-if",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d82cfc11ce7f2c3faef78d8a684447b40d503d9681acebed6cb728d45940c4db"
dependencies = [ dependencies = [
"autocfg",
"cfg-if", "cfg-if",
"lazy_static", "lazy_static",
] ]
[[package]]
name = "crossterm"
version = "0.19.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7c36c10130df424b2f3552fcc2ddcd9b28a27b1e54b358b45874f88d1ca6888c"
dependencies = [
"bitflags",
"crossterm_winapi",
"lazy_static",
"libc",
"mio",
"parking_lot",
"signal-hook",
"winapi 0.3.9",
]
[[package]]
name = "crossterm_winapi"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0da8964ace4d3e4a044fd027919b2237000b24315a37c916f61809f1ff2140b9"
dependencies = [
"winapi 0.3.9",
]
[[package]] [[package]]
name = "either" name = "either"
version = "1.6.1" version = "1.6.1"
@ -106,9 +177,9 @@ checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457"
[[package]] [[package]]
name = "heck" name = "heck"
version = "0.3.2" version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87cbf45460356b7deeb5e3415b5563308c0a9b057c85e12b06ad551f98d0a6ac" checksum = "6d621efb26863f0e9924c6ac577e8275e5e6b77455db64ffa6c65c904e9e132c"
dependencies = [ dependencies = [
"unicode-segmentation", "unicode-segmentation",
] ]
@ -122,6 +193,25 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "instant"
version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "61124eeebbd69b8190558df225adf7e4caafce0d743919e5d6b19652314ec5ec"
dependencies = [
"cfg-if",
]
[[package]]
name = "kernel32-sys"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d"
dependencies = [
"winapi 0.2.8",
"winapi-build",
]
[[package]] [[package]]
name = "lazy_static" name = "lazy_static"
version = "1.4.0" version = "1.4.0"
@ -130,19 +220,68 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]] [[package]]
name = "libc" name = "libc"
version = "0.2.91" version = "0.2.97"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8916b1f6ca17130ec6568feccee27c156ad12037880833a3b842a823236502e7" checksum = "12b8adadd720df158f4d70dfe7ccc6adb0472d7c55ca83445f6a5ab3e36f8fb6"
[[package]]
name = "lock_api"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0382880606dff6d15c9476c416d18690b72742aa7b605bb6dd6ec9030fbf07eb"
dependencies = [
"scopeguard",
]
[[package]]
name = "log"
version = "0.4.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51b9bbe6c47d51fc3e1a9b945965946b4c44142ab8792c50835a980d362c2710"
dependencies = [
"cfg-if",
]
[[package]] [[package]]
name = "memoffset" name = "memoffset"
version = "0.6.1" version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "157b4208e3059a8f9e78d559edc658e13df41410cb3ae03979c83130067fdd87" checksum = "59accc507f1338036a0477ef61afdae33cde60840f4dfe481319ce3ad116ddf9"
dependencies = [ dependencies = [
"autocfg", "autocfg",
] ]
[[package]]
name = "mio"
version = "0.7.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf80d3e903b34e0bd7282b218398aec54e082c840d9baf8339e0080a0c542956"
dependencies = [
"libc",
"log",
"miow",
"ntapi",
"winapi 0.3.9",
]
[[package]]
name = "miow"
version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b9f1c5b025cda876f66ef43a113f91ebc9f4ccef34843000e0adf6ebbab84e21"
dependencies = [
"winapi 0.3.9",
]
[[package]]
name = "ntapi"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f6bb902e437b6d86e03cce10a7e2af662292c5dfef23b65899ea3ac9354ad44"
dependencies = [
"winapi 0.3.9",
]
[[package]] [[package]]
name = "num-traits" name = "num-traits"
version = "0.2.14" version = "0.2.14"
@ -162,6 +301,31 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "parking_lot"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d7744ac029df22dca6284efe4e898991d28e3085c706c972bcd7da4a27a15eb"
dependencies = [
"instant",
"lock_api",
"parking_lot_core",
]
[[package]]
name = "parking_lot_core"
version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fa7a782938e745763fe6907fc6ba86946d72f49fe7e21de074e08128a99fb018"
dependencies = [
"cfg-if",
"instant",
"libc",
"redox_syscall",
"smallvec",
"winapi 0.3.9",
]
[[package]] [[package]]
name = "proc-macro-error" name = "proc-macro-error"
version = "1.0.4" version = "1.0.4"
@ -188,9 +352,9 @@ dependencies = [
[[package]] [[package]]
name = "proc-macro2" name = "proc-macro2"
version = "1.0.24" version = "1.0.27"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e0704ee1a7e00d7bb417d0770ea303c1bccbabf0ef1667dae92b5967f5f8a71" checksum = "f0d8caf72986c1a598726adc988bb5984792ef84f5ee5aa50209145ee8077038"
dependencies = [ dependencies = [
"unicode-xid", "unicode-xid",
] ]
@ -206,9 +370,9 @@ dependencies = [
[[package]] [[package]]
name = "rayon" name = "rayon"
version = "1.5.0" version = "1.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b0d8e0819fadc20c74ea8373106ead0600e3a67ef1fe8da56e39b9ae7275674" checksum = "c06aca804d41dbc8ba42dfd964f0d01334eceb64314b9ecf7c5fad5188a06d90"
dependencies = [ dependencies = [
"autocfg", "autocfg",
"crossbeam-deque", "crossbeam-deque",
@ -218,9 +382,9 @@ dependencies = [
[[package]] [[package]]
name = "rayon-core" name = "rayon-core"
version = "1.9.0" version = "1.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ab346ac5921dc62ffa9f89b7a773907511cdfa5490c572ae9be1be33e8afa4a" checksum = "d78120e2c850279833f1dd3582f730c4ab53ed95aeaaaa862a2a5c71b1656d8e"
dependencies = [ dependencies = [
"crossbeam-channel", "crossbeam-channel",
"crossbeam-deque", "crossbeam-deque",
@ -229,22 +393,69 @@ dependencies = [
"num_cpus", "num_cpus",
] ]
[[package]]
name = "redox_syscall"
version = "0.2.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "742739e41cd49414de871ea5e549afb7e2a3ac77b589bcbebe8c82fab37147fc"
dependencies = [
"bitflags",
]
[[package]] [[package]]
name = "rust-stream" name = "rust-stream"
version = "3.4.0" version = "3.4.0"
dependencies = [ dependencies = [
"colour",
"core_affinity",
"crossbeam",
"libc",
"num-traits", "num-traits",
"num_cpus",
"rayon", "rayon",
"rustversion",
"structopt", "structopt",
"tabular", "tabular",
] ]
[[package]]
name = "rustversion"
version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "61b3909d758bb75c79f23d4736fac9433868679d3ad2ea7a61e3c25cfda9a088"
[[package]] [[package]]
name = "scopeguard" name = "scopeguard"
version = "1.1.0" version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
[[package]]
name = "signal-hook"
version = "0.1.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e31d442c16f047a671b5a71e2161d6e68814012b7f5379d269ebd915fac2729"
dependencies = [
"libc",
"mio",
"signal-hook-registry",
]
[[package]]
name = "signal-hook-registry"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e51e73328dc4ac0c7ccbda3a494dfa03df1de2f46018127f60c693f2648455b0"
dependencies = [
"libc",
]
[[package]]
name = "smallvec"
version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fe0f37c9e8f3c5a4a66ad655a93c74daac4ad00c441533bf5c6e7990bb42604e"
[[package]] [[package]]
name = "strsim" name = "strsim"
version = "0.8.0" version = "0.8.0"
@ -277,9 +488,9 @@ dependencies = [
[[package]] [[package]]
name = "syn" name = "syn"
version = "1.0.64" version = "1.0.73"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3fd9d1e9976102a03c542daa2eff1b43f9d72306342f3f8b3ed5fb8908195d6f" checksum = "f71489ff30030d2ae598524f61326b902466f72a0fb1a8564c001cc63425bcc7"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
@ -318,9 +529,9 @@ checksum = "9337591893a19b88d8d87f2cec1e73fad5cdfd10e5a6f349f498ad6ea2ffb1e3"
[[package]] [[package]]
name = "unicode-xid" name = "unicode-xid"
version = "0.2.1" version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7fe0bb3479651439c9112f72b6c505038574c9fbb575ed1bf3b797fa39dd564" checksum = "8ccb82d61f80a663efe1f787a51b16b5a51e3314d6ac365b08639f52387b33f3"
[[package]] [[package]]
name = "vec_map" name = "vec_map"
@ -334,6 +545,12 @@ version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5fecdca9a5291cc2b8dcf7dc02453fee791a280f3743cb0905f8822ae463b3fe" checksum = "5fecdca9a5291cc2b8dcf7dc02453fee791a280f3743cb0905f8822ae463b3fe"
[[package]]
name = "winapi"
version = "0.2.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "167dc9d6949a9b857f3451275e911c3f44255842c1f7a76f33c55103a909087a"
[[package]] [[package]]
name = "winapi" name = "winapi"
version = "0.3.9" version = "0.3.9"
@ -344,6 +561,12 @@ dependencies = [
"winapi-x86_64-pc-windows-gnu", "winapi-x86_64-pc-windows-gnu",
] ]
[[package]]
name = "winapi-build"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2d315eee3b34aca4797b2da6b13ed88266e6d612562a0c46390af8299fc699bc"
[[package]] [[package]]
name = "winapi-i686-pc-windows-gnu" name = "winapi-i686-pc-windows-gnu"
version = "0.4.0" version = "0.4.0"

View File

@ -10,4 +10,22 @@ edition = "2018"
num-traits = "0.2.14" num-traits = "0.2.14"
structopt = "0.3.13" structopt = "0.3.13"
tabular = "0.1.4" tabular = "0.1.4"
rayon = "1.5" rayon = "1.5.1"
crossbeam = "0.8.1"
num_cpus = "1.13.0"
rustversion = "1.0"
libc = "0.2.97"
core_affinity = "0.5.10"
colour = "0.6.0"
[build-dependencies]
rustversion = "1.0"
[profile.dev]
opt-level = 2
overflow-checks = true
[profile.release]
opt-level = 3
lto = "thin" # fully enabling this (i.e true) negatively affects performance as tested on both AMD and Intel

72
rust-stream/README.md Normal file
View File

@ -0,0 +1,72 @@
rust-stream
===========
This is an implementation of BabelStream in Rust.
Currently, we support three CPU threading API as devices:
* Plain - basic single-threaded `for` version, see [plain_stream.rs](src/plain_stream.rs)
* [Rayon](https://github.com/rayon-rs/rayon) - Parallel with high level API,
see [rayon_stream.rs](src/rayon_stream.rs)
* [Crossbeam](https://github.com/crossbeam-rs/crossbeam) - Parallel with partitions per thread,
see [crossbeam_stream.rs](src/crossbeam_stream.rs)
In addition, this implementation also supports the following extra flags:
```
--init Initialise each benchmark array at allocation time on the main thread
--malloc Use libc malloc instead of the Rust's allocator for benchmark array allocation
--pin Pin threads to distinct cores, this has NO effect in Rayon devices
```
There is an ongoing investigation on potential performance issues under NUMA situations. As part of
the experiment, this implementation made use of the
provisional [Allocator traits](https://github.com/rust-lang/rust/issues/32838) which requires rust
unstable. We hope a NUMA aware allocator will be available once the allocator API reaches rust
stable.
### Build & Run
Prerequisites:
* [Rust toolchain](https://www.rust-lang.org/tools/install)
Once the toolchain is installed, enable the nightly channel:
```shell
> rustup install nightly
> rustup default nightly # optional, this sets `+nightly` automatically for cargo calls later
```
With `cargo` on path, compile and run the benchmark with:
```shell
> cd rust-stream/
> cargo +nightly build --release # or simply `cargo build --release` if nightly channel is the default
> ./target/release/rust-stream --help
rust-stream 3.4.0
USAGE:
rust-stream [FLAGS] [OPTIONS]
FLAGS:
--csv Output as csv table
--float Use floats (rather than doubles)
-h, --help Prints help information
--init Initialise each benchmark array at allocation time on the main thread
--list List available devices
--malloc Use libc malloc instead of the Rust's allocator for benchmark array allocation
--mibibytes Use MiB=2^20 for bandwidth calculation (default MB=10^6)
--nstream-only Only run nstream
--pin Pin threads to distinct cores, this has NO effect in Rayon devices
--triad-only Only run triad
-V, --version Prints version information
OPTIONS:
-s, --arraysize <arraysize> Use <arraysize> elements in the array [default: 33554432]
--device <device> Select device at <device> [default: 0]
-n, --numtimes <numtimes> Run the test <numtimes> times (NUM >= 2) [default: 100]
```

68
rust-stream/rustfmt.toml Normal file
View File

@ -0,0 +1,68 @@
max_width = 100
hard_tabs = false
tab_spaces = 2
newline_style = "Auto"
use_small_heuristics = "Max"
indent_style = "Block"
wrap_comments = false
format_code_in_doc_comments = false
comment_width = 80
normalize_comments = false
normalize_doc_attributes = false
license_template_path = ""
format_strings = false
format_macro_matchers = false
format_macro_bodies = true
empty_item_single_line = true
struct_lit_single_line = true
fn_single_line = true
where_single_line = true
imports_indent = "Block"
imports_layout = "Mixed"
imports_granularity = "Preserve"
group_imports = "Preserve"
reorder_imports = true
reorder_modules = true
reorder_impl_items = false
type_punctuation_density = "Wide"
space_before_colon = false
space_after_colon = true
spaces_around_ranges = false
binop_separator = "Front"
remove_nested_parens = true
combine_control_expr = true
overflow_delimited_expr = false
struct_field_align_threshold = 0
enum_discrim_align_threshold = 0
match_arm_blocks = true
match_arm_leading_pipes = "Never"
force_multiline_blocks = false
fn_args_layout = "Compressed"
brace_style = "PreferSameLine"
control_brace_style = "AlwaysSameLine"
trailing_semicolon = true
trailing_comma = "Vertical"
match_block_trailing_comma = false
blank_lines_upper_bound = 1
blank_lines_lower_bound = 0
edition = "2015"
version = "One"
inline_attribute_width = 0
merge_derives = true
use_try_shorthand = false
use_field_init_shorthand = false
force_explicit_abi = true
condense_wildcard_suffixes = false
color = "Auto"
required_version = "1.4.36"
unstable_features = false
disable_all_formatting = false
skip_children = false
hide_parse_errors = false
error_on_line_overflow = false
error_on_unformatted = false
report_todo = "Never"
report_fixme = "Never"
ignore = []
emit_mode = "Files"
make_backup = false

View File

@ -0,0 +1,223 @@
use std::iter::Sum;
use std::slice::{Chunks, ChunksMut};
use crossbeam::thread;
use self::core_affinity::CoreId;
use crate::stream::{AllocatorType, ArrayType, RustStream, StreamData};
pub struct ThreadedDevice {
pub(crate) ncore: usize,
pub(crate) pin: bool,
pub(crate) core_ids: Vec<CoreId>,
}
impl ThreadedDevice {
pub fn new(ncore: usize, pin: bool) -> Self {
let mut core_ids = match core_affinity::get_core_ids() {
Some(xs) => xs,
None => {
colour::e_red_ln!("Cannot enumerate cores, pinning will not work if enabled");
(0..ncore).map(|i| CoreId { id: i }).collect()
}
};
core_ids.resize(ncore, core_ids[0]);
ThreadedDevice { ncore, pin, core_ids }
}
}
impl ThreadedDevice {
// divide the length by the number of cores, the last core gets less work if it does not divide
fn chunk_size(&self, len: usize) -> usize {
(len as f64 / self.ncore as f64).ceil() as usize
}
// make a mutable chunk from the vec
fn mk_mut_chunks<'a, T, A: AllocatorType>(&self, xs: &'a mut Vec<T, A>) -> ChunksMut<'a, T> {
let len = xs.len();
xs.chunks_mut(self.chunk_size(len))
}
// make a immutable chunk from the vec
fn mk_chunks<'a, T, A: AllocatorType>(&self, xs: &'a mut Vec<T, A>) -> Chunks<'a, T> {
xs.chunks(self.chunk_size(xs.len()))
}
}
extern crate core_affinity;
// Crossbeam threaded version, it should be semantically equal to the single threaded version
impl<T: ArrayType + Sync + Send + Sum, A: AllocatorType + Sync + Send> RustStream<T>
for StreamData<T, ThreadedDevice, A>
{
fn init_arrays(&mut self) {
thread::scope(|s| {
let init = self.init;
let pin = self.device.pin;
for (t, ((a, b), c)) in self.device.core_ids.iter().zip(
self
.device
.mk_mut_chunks(&mut self.a)
.zip(self.device.mk_mut_chunks(&mut self.b))
.zip(self.device.mk_mut_chunks(&mut self.c)),
) {
s.spawn(move |_| {
if pin {
core_affinity::set_for_current(*t);
}
for x in a.into_iter() {
*x = init.0;
}
for x in b.into_iter() {
*x = init.1;
}
for x in c.into_iter() {
*x = init.2;
}
});
}
})
.unwrap()
}
fn copy(&mut self) {
thread::scope(|s| {
let pin = self.device.pin;
for (t, (c, a)) in self
.device
.core_ids
.iter()
.zip(self.device.mk_mut_chunks(&mut self.c).zip(self.device.mk_chunks(&mut self.a)))
{
s.spawn(move |_| {
if pin {
core_affinity::set_for_current(*t);
}
for i in 0..c.len() {
c[i] = a[i];
}
});
}
})
.unwrap()
}
fn mul(&mut self) {
thread::scope(|s| {
let pin = self.device.pin;
let scalar = self.scalar;
for (t, (b, c)) in self
.device
.core_ids
.iter()
.zip(self.device.mk_mut_chunks(&mut self.b).zip(self.device.mk_chunks(&mut self.c)))
{
s.spawn(move |_| {
if pin {
core_affinity::set_for_current(*t);
}
for i in 0..b.len() {
b[i] = scalar * c[i];
}
});
}
})
.unwrap()
}
fn add(&mut self) {
thread::scope(|s| {
let pin = self.device.pin;
for (t, (c, (a, b))) in (&mut self.device.core_ids.iter()).zip(
self
.device
.mk_mut_chunks(&mut self.c)
.zip(self.device.mk_chunks(&mut self.a).zip(self.device.mk_chunks(&mut self.b))),
) {
s.spawn(move |_| {
if pin {
core_affinity::set_for_current(*t);
}
for i in 0..c.len() {
c[i] = a[i] + b[i];
}
});
}
})
.unwrap()
}
fn triad(&mut self) {
thread::scope(|s| {
let pin = self.device.pin;
let scalar = self.scalar;
for (t, (a, (b, c))) in self.device.core_ids.iter().zip(
self
.device
.mk_mut_chunks(&mut self.a)
.zip(self.device.mk_chunks(&mut self.b).zip(self.device.mk_chunks(&mut self.c))),
) {
s.spawn(move |_| {
if pin {
core_affinity::set_for_current(*t);
}
for i in 0..a.len() {
a[i] = b[i] + scalar * c[i]
}
});
}
})
.unwrap()
}
fn nstream(&mut self) {
thread::scope(|s| {
let pin = self.device.pin;
let scalar = self.scalar;
for (t, (a, (b, c))) in self.device.core_ids.iter().zip(
self
.device
.mk_mut_chunks(&mut self.a)
.zip(self.device.mk_chunks(&mut self.b).zip(self.device.mk_chunks(&mut self.c))),
) {
s.spawn(move |_| {
if pin {
core_affinity::set_for_current(*t);
}
for i in 0..a.len() {
a[i] += b[i] + scalar * c[i]
}
});
}
})
.unwrap()
}
fn dot(&mut self) -> T {
let mut partial_sum = vec![T::zero(); self.device.ncore];
thread::scope(|s| {
let pin = self.device.pin;
let a = &self.a;
let b = &self.b;
let chunk_indices = |i: usize| {
let chunk_size = self.device.chunk_size(self.size);
let start = i * chunk_size;
start..((start + chunk_size).min(self.size))
};
for (t, (n, acc)) in self.device.core_ids.iter().zip(partial_sum.iter_mut().enumerate()) {
s.spawn(move |_| {
if pin {
core_affinity::set_for_current(*t);
}
let mut p = T::zero();
for i in chunk_indices(n) {
p += a[i] * b[i];
}
*acc = p;
});
}
})
.unwrap();
partial_sum.into_iter().sum()
}
}

View File

@ -1,240 +1,95 @@
#![feature(allocator_api)]
#![feature(vec_into_raw_parts)]
use std::alloc::System;
use std::fmt::{Debug, Display}; use std::fmt::{Debug, Display};
use std::iter::Sum; use std::iter::Sum;
use std::mem::size_of; use std::mem::size_of;
use std::time::{Duration, Instant}; use std::time::Duration;
use num_traits::{abs, NumAssign, Signed}; use num_traits::abs;
use num_traits::real::Real;
use rayon::prelude::*;
use structopt::StructOpt; use structopt::StructOpt;
use tabular::{Row, Table}; use tabular::{Row, Table};
use crate::crossbeam_stream::ThreadedDevice;
use crate::plain_stream::SerialDevice;
use crate::rayon_stream::RayonDevice;
use crate::stream::{AllocatorType, ArrayType, RustStream, StreamData};
mod crossbeam_stream;
mod plain_stream;
mod rayon_stream;
mod stream;
#[derive(Debug, StructOpt)] #[derive(Debug, StructOpt)]
struct Options { struct Options {
/// List available devices /// List available devices
#[structopt(long)] list: bool, #[structopt(long)]
list: bool,
/// Select device at <device> /// Select device at <device>
#[structopt(long, default_value = "0")] device: usize, #[structopt(long, default_value = "0")]
device: usize,
/// Run the test <numtimes> times (NUM >= 2) /// Run the test <numtimes> times (NUM >= 2)
#[structopt(long, default_value = "100")] numtimes: usize, #[structopt(long, short = "n", default_value = "100")]
numtimes: usize,
/// Use <arraysize> elements in the array /// Use <arraysize> elements in the array
#[structopt(long, default_value = "33554432")] arraysize: usize, #[structopt(long, short = "s", default_value = "33554432")]
arraysize: usize,
/// Use floats (rather than doubles) /// Use floats (rather than doubles)
#[structopt(long)] float: bool, #[structopt(long)]
float: bool,
/// Only run triad /// Only run triad
#[structopt(long)] triad_only: bool, #[structopt(long)]
triad_only: bool,
/// Only run nstream /// Only run nstream
#[structopt(long)] nstream_only: bool, #[structopt(long)]
nstream_only: bool,
/// Output as csv table /// Output as csv table
#[structopt(long)] csv: bool, #[structopt(long)]
csv: bool,
/// Use MiB=2^20 for bandwidth calculation (default MB=10^6) /// Use MiB=2^20 for bandwidth calculation (default MB=10^6)
#[structopt(long)] mibibytes: bool, #[structopt(long)]
mibibytes: bool,
/// Use libc malloc instead of the Rust's allocator for benchmark array allocation
#[structopt(name = "malloc", long)]
malloc: bool,
/// Initialise each benchmark array at allocation time on the main thread
#[structopt(name = "init", long)]
init: bool,
/// Pin threads to distinct cores, this has NO effect in Rayon devices
#[structopt(long)]
pin: bool,
} }
#[derive(PartialEq)] #[derive(PartialEq)]
enum Benchmark { All, Triad, NStream } enum Benchmark {
All,
struct StreamData<T> { Triad,
size: usize, NStream,
scalar: T,
a: Vec<T>,
b: Vec<T>,
c: Vec<T>,
} }
impl<T: Default + Clone> StreamData<T> { fn check_solution<T: ArrayType + Display + Sum + Into<f64>, D, A: AllocatorType>(
pub fn new(size: usize, scalar: T) -> StreamData<T> { benchmark: Benchmark, numtimes: usize, vec: &StreamData<T, D, A>, dot_sum: Option<T>,
StreamData { ) {
size, let (mut gold_a, mut gold_b, mut gold_c) = vec.init;
scalar,
a: vec![T::default(); size],
b: vec![T::default(); size],
c: vec![T::default(); size],
}
}
}
struct PlainFor;
struct RayonPar;
#[inline(always)]
fn timed<F: FnOnce()>(f: F) -> Duration {
let start = Instant::now();
f();
start.elapsed()
}
#[inline(always)]
fn timed_mut<T, F: FnMut() -> T>(f: &mut F) -> (Duration, T) {
let start = Instant::now();
let x = f();
(start.elapsed(), x)
}
struct AllTiming<T> { copy: T, mul: T, add: T, triad: T, dot: T }
trait RustStream<T: Default, K> {
fn init_arrays(&mut self, init: (T, T, T));
fn copy(&mut self);
fn mul(&mut self);
fn add(&mut self);
fn triad(&mut self);
fn nstream(&mut self);
fn dot(&mut self) -> T;
fn run_all(&mut self, n: usize) -> (AllTiming<Vec<Duration>>, T) {
let mut timings: AllTiming<Vec<Duration>> = AllTiming {
copy: vec![Duration::default(); n],
mul: vec![Duration::default(); n],
add: vec![Duration::default(); n],
triad: vec![Duration::default(); n],
dot: vec![Duration::default(); n],
};
let mut last_sum = T::default();
for i in 0..n {
timings.copy[i] = timed(|| self.copy());
timings.mul[i] = timed(|| self.mul());
timings.add[i] = timed(|| self.add());
timings.triad[i] = timed(|| self.triad());
let (dot, sum) = timed_mut(&mut || self.dot());
timings.dot[i] = dot;
last_sum = sum;
}
(timings, last_sum)
}
fn run_triad(&mut self, n: usize) -> Duration {
timed(|| for _ in 0..n { self.triad(); })
}
fn run_nstream(&mut self, n: usize) -> Vec<Duration> {
(0..n).map(|_| timed(|| self.nstream())).collect::<Vec<_>>()
}
}
trait ArrayType: Real + NumAssign + Signed + Default {}
impl<T: Real + NumAssign + Signed + Default> ArrayType for T {}
// single threaded version
impl<T: ArrayType> RustStream<T, PlainFor> for StreamData<T> {
fn init_arrays(&mut self, init: (T, T, T)) {
self.a.fill(init.0);
self.b.fill(init.1);
self.c.fill(init.2);
}
fn copy(&mut self) {
for i in 0..self.size {
self.c[i] = self.a[i];
}
}
fn mul(&mut self) {
for i in 0..self.size {
self.b[i] = self.scalar * self.c[i];
}
}
fn add(&mut self) {
for i in 0..self.size {
self.c[i] = self.a[i] + self.b[i];
}
}
fn triad(&mut self) {
for i in 0..self.size {
self.a[i] = self.b[i] + self.scalar * self.c[i];
}
}
fn nstream(&mut self) {
for i in 0..self.size {
self.a[i] += self.b[i] * self.scalar * self.c[i];
}
}
fn dot(&mut self) -> T {
let mut sum: T = T::default();
for i in 0..self.size {
sum += self.a[i] * self.b[i];
}
sum
}
}
// Rayon version, it should be semantically equal to the single threaded version
impl<T: ArrayType + Sync + Send + Sum> RustStream<T, RayonPar> for StreamData<T> {
fn init_arrays(&mut self, init: (T, T, T)) {
self.a.fill(init.0);
self.b.fill(init.1);
self.c.fill(init.2);
}
fn copy(&mut self) {
let a = &self.a;
self.c.par_iter_mut().enumerate().for_each(|(i, c)| *c = a[i])
}
fn mul(&mut self) {
let c = &self.c;
let scalar = &self.scalar;
self.b.par_iter_mut().enumerate().for_each(|(i, b)| *b = *scalar * c[i])
}
fn add(&mut self) {
let a = &self.a;
let b = &self.b;
self.c.par_iter_mut().enumerate().for_each(|(i, c)| *c = a[i] + b[i])
}
fn triad(&mut self) {
let scalar = &self.scalar;
let b = &self.b;
let c = &self.c;
self.a.par_iter_mut().enumerate().for_each(|(i, a)| *a = b[i] + *scalar * c[i])
}
fn nstream(&mut self) {
let scalar = &self.scalar;
let b = &self.b;
let c = &self.c;
self.a.par_iter_mut().enumerate().for_each(|(i, a)| *a += b[i] + *scalar * c[i])
}
fn dot(&mut self) -> T {
let a = &self.a;
let b = &self.b;
(0..self.size).into_par_iter().fold(|| T::default(), |acc, i| acc + a[i] * b[i]).sum::<T>()
}
}
fn validate<T: ArrayType + Display + Sum + Into<f64>>(
benchmark: Benchmark,
numtimes: usize,
vec: &StreamData<T>,
dot_sum: Option<T>,
scalar: T, init: (T, T, T)) {
let (mut gold_a, mut gold_b, mut gold_c) = init;
for _ in 0..numtimes { for _ in 0..numtimes {
match benchmark { match benchmark {
Benchmark::All => { Benchmark::All => {
gold_c = gold_a; gold_c = gold_a;
gold_b = scalar * gold_c; gold_b = vec.scalar * gold_c;
gold_c = gold_a + gold_b; gold_c = gold_a + gold_b;
gold_a = gold_b + scalar * gold_c; gold_a = gold_b + vec.scalar * gold_c;
} }
Benchmark::Triad => { Benchmark::Triad => {
gold_a = gold_b + scalar * gold_c; gold_a = gold_b + vec.scalar * gold_c;
} }
Benchmark::NStream => { Benchmark::NStream => {
gold_a += gold_b + scalar * gold_c; gold_a += gold_b + vec.scalar * gold_c;
} }
}; };
} }
let tolerance = T::epsilon().into() * 100.0f64; let tolerance = T::epsilon().into() * 100.0f64;
let validate_xs = |name: &str, xs: &Vec<T>, from: T| { let validate_xs = |name: &str, xs: &Vec<T, A>, from: T| {
let error = (xs.iter().map(|x| abs(*x - from)).sum::<T>()).into() / xs.len() as f64; let error = (xs.iter().map(|x| abs(*x - from)).sum::<T>()).into() / xs.len() as f64;
if error > tolerance { if error > tolerance {
eprintln!("Validation failed on {}[]. Average error {} ", name, error) eprintln!("Validation failed on {}[]. Average error {} ", name, error)
@ -248,46 +103,69 @@ fn validate<T: ArrayType + Display + Sum + Into<f64>>(
let gold_sum = (gold_a * gold_b).into() * vec.size as f64; let gold_sum = (gold_a * gold_b).into() * vec.size as f64;
let error = abs((sum.into() - gold_sum) / gold_sum); let error = abs((sum.into() - gold_sum) / gold_sum);
if error > 1.0e-8 { if error > 1.0e-8 {
eprintln!("Validation failed on sum. Error {} \nSum was {} but should be {}", error, sum, gold_sum); eprintln!(
"Validation failed on sum. Error {} \nSum was {} but should be {}",
error, sum, gold_sum
);
} }
} }
} }
fn run_cpu<T: ArrayType + Sync + Send + Sum + Into<f64> + Display>(option: Options, scalar: T, init: (T, T, T)) { fn run_cpu<T: ArrayType + Sync + Send + Sum + Into<f64> + Display, D, A: AllocatorType>(
option: &Options, mut stream: StreamData<T, D, A>,
) where
StreamData<T, D, A>: RustStream<T>,
{
let benchmark = match (option.nstream_only, option.triad_only) { let benchmark = match (option.nstream_only, option.triad_only) {
(true, false) => Benchmark::NStream, (true, false) => Benchmark::NStream,
(false, true) => Benchmark::Triad, (false, true) => Benchmark::Triad,
(false, false) => Benchmark::All, (false, false) => Benchmark::All,
(true, true) => panic!("Both triad and nstream are enabled, pick one or omit both to run all benchmarks"), (true, true) => {
panic!("Both triad and nstream are enabled, pick one or omit both to run all benchmarks")
}
}; };
let array_bytes = option.arraysize * size_of::<T>(); let array_bytes = option.arraysize * size_of::<T>();
let total_bytes = array_bytes * 3; let total_bytes = array_bytes * 3;
let (mega_scale, mega_suffix, giga_scale, giga_suffix) = let (mega_scale, mega_suffix, giga_scale, giga_suffix) = if !option.mibibytes {
if !option.mibibytes { (1.0e-6, "MB", 1.0e-9, "GB") } else { (2f64.powi(-20), "MiB", 2f64.powi(-30), "GiB") }; (1.0e-6, "MB", 1.0e-9, "GB")
} else {
(2f64.powi(-20), "MiB", 2f64.powi(-30), "GiB")
};
if !option.csv { if !option.csv {
println!("Running {} {} times", match benchmark { println!(
"Running {} {} times",
match benchmark {
Benchmark::All => "kernels", Benchmark::All => "kernels",
Benchmark::Triad => "triad", Benchmark::Triad => "triad",
Benchmark::NStream => "nstream", Benchmark::NStream => "nstream",
}, option.numtimes); },
option.numtimes
);
if benchmark == Benchmark::Triad { if benchmark == Benchmark::Triad {
println!("Number of elements: {}", option.arraysize); println!("Number of elements: {}", option.arraysize);
} }
println!("Precision: {}", if option.float { "float" } else { "double" }); println!("Precision: {}", if option.float { "float" } else { "double" });
println!("Array size: {:.1} {}(={:.1} {})", println!(
mega_scale * array_bytes as f64, mega_suffix, giga_scale * array_bytes as f64, giga_suffix); "Array size: {:.1} {} (={:.1} {})",
println!("Total size: {:.1} {}(={:.1} {})", mega_scale * array_bytes as f64,
mega_scale * total_bytes as f64, mega_suffix, giga_scale * total_bytes as f64, giga_suffix); mega_suffix,
giga_scale * array_bytes as f64,
giga_suffix
);
println!(
"Total size: {:.1} {} (={:.1} {})",
mega_scale * total_bytes as f64,
mega_suffix,
giga_scale * total_bytes as f64,
giga_suffix
);
} }
stream.init_arrays();
let mut vec: StreamData<T> = StreamData::<T>::new(option.arraysize, scalar);
let stream = &mut vec as &mut dyn RustStream<T, RayonPar>;
stream.init_arrays(init);
let tabulate = |xs: &Vec<Duration>, name: &str, t_size: usize| -> Vec<(&str, String)> { let tabulate = |xs: &Vec<Duration>, name: &str, t_size: usize| -> Vec<(&str, String)> {
let tail = &xs[1..]; // tail only let tail = &xs[1..]; // tail only
@ -304,7 +182,10 @@ fn run_cpu<T: ArrayType + Sync + Send + Sum + Into<f64> + Display>(option: Optio
("num_times", option.numtimes.to_string()), ("num_times", option.numtimes.to_string()),
("n_elements", option.arraysize.to_string()), ("n_elements", option.arraysize.to_string()),
("sizeof", t_size.to_string()), ("sizeof", t_size.to_string()),
(if option.mibibytes { "max_mibytes_per_sec" } else { "max_mbytes_per_sec" }, mbps.to_string()), (
if option.mibibytes { "max_mibytes_per_sec" } else { "max_mbytes_per_sec" },
mbps.to_string(),
),
("min_runtime", min.to_string()), ("min_runtime", min.to_string()),
("max_runtime", max.to_string()), ("max_runtime", max.to_string()),
("avg_runtime", avg.to_string()), ("avg_runtime", avg.to_string()),
@ -319,7 +200,7 @@ fn run_cpu<T: ArrayType + Sync + Send + Sum + Into<f64> + Display>(option: Optio
] ]
} }
} }
(_, _) => panic!("No min/max element for {}(size={})", name, t_size) (_, _) => panic!("No min/max element for {}(size={})", name, t_size),
} }
}; };
@ -337,17 +218,17 @@ fn run_cpu<T: ArrayType + Sync + Send + Sum + Into<f64> + Display>(option: Optio
for kvs in xs { for kvs in xs {
table.add_row(kvs.iter().fold(Row::new(), |row, (_, val)| row.with_cell(val))); table.add_row(kvs.iter().fold(Row::new(), |row, (_, val)| row.with_cell(val)));
} }
println!("{}", table); print!("{}", table);
} }
} }
_ => panic!("Empty tabulation") _ => panic!("Empty tabulation"),
}; };
}; };
match benchmark { match benchmark {
Benchmark::All => { Benchmark::All => {
let (results, sum) = stream.run_all(option.numtimes); let (results, sum) = stream.run_all(option.numtimes);
validate(benchmark, option.numtimes, &vec, Some(sum), scalar, init); check_solution(benchmark, option.numtimes, &stream, Some(sum));
tabulate_all(vec![ tabulate_all(vec![
tabulate(&results.copy, "Copy", 2 * array_bytes), tabulate(&results.copy, "Copy", 2 * array_bytes),
tabulate(&results.mul, "Mul", 2 * array_bytes), tabulate(&results.mul, "Mul", 2 * array_bytes),
@ -358,20 +239,19 @@ fn run_cpu<T: ArrayType + Sync + Send + Sum + Into<f64> + Display>(option: Optio
} }
Benchmark::NStream => { Benchmark::NStream => {
let results = stream.run_nstream(option.numtimes); let results = stream.run_nstream(option.numtimes);
validate(benchmark, option.numtimes, &vec, None, scalar, init); check_solution(benchmark, option.numtimes, &stream, None);
tabulate_all(vec![ tabulate_all(vec![tabulate(&results, "Nstream", 4 * array_bytes)]);
tabulate(&results, "Nstream", 4 * array_bytes)
]);
} }
Benchmark::Triad => { Benchmark::Triad => {
let results = stream.run_triad(option.numtimes); let results = stream.run_triad(option.numtimes);
let total_bytes = 3 * array_bytes * option.numtimes; let total_bytes = 3 * array_bytes * option.numtimes;
let bandwidth = mega_scale * (total_bytes as f64 / results.as_secs_f64()); let bandwidth = giga_scale * (total_bytes as f64 / results.as_secs_f64());
println!("Runtime (seconds): {:.5}", results.as_secs_f64()); println!("Runtime (seconds): {:.5}", results.as_secs_f64());
println!("Bandwidth ({}/s): {:.3} ", giga_suffix, bandwidth); println!("Bandwidth ({}/s): {:.3} ", giga_suffix, bandwidth);
} }
}; };
&stream.clean_up();
} }
const VERSION: Option<&'static str> = option_env!("CARGO_PKG_VERSION"); const VERSION: Option<&'static str> = option_env!("CARGO_PKG_VERSION");
@ -381,33 +261,154 @@ static START_B: f32 = 0.2;
static START_C: f32 = 0.0; static START_C: f32 = 0.0;
static START_SCALAR: f32 = 0.4; static START_SCALAR: f32 = 0.4;
static FLOAT_INIT_SCALAR: f32 = START_SCALAR;
static FLOAT_INIT: (f32, f32, f32) = (START_A, START_B, START_C);
static DOUBLE_START_SCALAR: f64 = START_SCALAR as f64;
static DOUBLE_INIT: (f64, f64, f64) = (START_A as f64, START_B as f64, START_C as f64);
fn main() { fn main() {
let options: Options = Options::from_args(); let options: Options = Options::from_args();
// only CPU via Rayon for now if options.numtimes < 2 {
let devices = vec![("CPU (Rayon)", |opt: Options| { panic!("numtimes must be >= 2")
if opt.float {
run_cpu::<f32>(opt, START_SCALAR, (START_A, START_B, START_C));
} else {
run_cpu::<f64>(opt, START_SCALAR.into(), (START_A.into(), START_B.into(), START_C.into()));
} }
})];
let alloc = System;
let alloc_name = if options.malloc { "libc-malloc" } else { "rust-system" };
let rayon_device = &|| {
let dev = RayonDevice { pool: rayon::ThreadPoolBuilder::default().build().unwrap() };
if !options.csv {
println!("Using {} thread(s), alloc={}", dev.pool.current_num_threads(), alloc_name);
if options.pin {
colour::e_yellow_ln!("Pinning threads have no effect on Rayon!")
}
}
if options.float {
run_cpu(
&options,
StreamData::new_in(
options.arraysize,
FLOAT_INIT_SCALAR,
FLOAT_INIT,
dev,
alloc,
options.malloc,
options.init,
),
);
} else {
run_cpu(
&options,
StreamData::new_in(
options.arraysize,
DOUBLE_START_SCALAR,
DOUBLE_INIT,
dev,
alloc,
options.malloc,
options.init,
),
);
}
};
let crossbeam_device = &|| {
let ncores = num_cpus::get();
let dev = ThreadedDevice::new(ncores, options.pin);
if !options.csv {
println!("Using {} thread(s), pin={}, alloc={}", ncores, options.pin, alloc_name)
}
if options.float {
run_cpu(
&options,
StreamData::new_in(
options.arraysize,
FLOAT_INIT_SCALAR,
FLOAT_INIT,
dev,
alloc,
options.malloc,
options.init,
),
);
} else {
run_cpu(
&options,
StreamData::new_in(
options.arraysize,
DOUBLE_START_SCALAR,
DOUBLE_INIT,
dev,
alloc,
options.malloc,
options.init,
),
);
}
};
let st_device = &|| {
let dev = SerialDevice { pin: options.pin };
if !options.csv {
println!("Using 1 thread, pin={}, alloc={}", options.pin, alloc_name);
}
if options.float {
run_cpu(
&options,
StreamData::new_in(
options.arraysize,
FLOAT_INIT_SCALAR,
FLOAT_INIT,
dev,
alloc,
options.malloc,
options.init,
),
);
} else {
run_cpu(
&options,
StreamData::new_in(
options.arraysize,
DOUBLE_START_SCALAR,
DOUBLE_INIT,
dev,
alloc,
options.malloc,
options.init,
),
);
}
};
let devices: Vec<(String, &'_ dyn Fn())> = vec![
("CPU (Rayon)".to_string(), rayon_device),
(format!("CPU (Crossbeam, pinning={})", options.pin), crossbeam_device),
("CPU (Single threaded)".to_string(), st_device),
];
if options.list { if options.list {
devices.iter().enumerate().for_each(|(i, (name, _))| { devices.iter().enumerate().for_each(|(i, (name, _))| {
println!("{}: {}", i, name); println!("[{}] {}", i, name);
}) })
} else { } else {
match devices.get(options.device) { match devices.get(options.device) {
Some((_, run)) => { Some((name, run)) => {
if !&options.csv { if !&options.csv {
println!("BabelStream\n\ println!(
"BabelStream\n\
Version: {}\n\ Version: {}\n\
Implementation: Rust+Rayon", VERSION.unwrap_or("unknown")) Implementation: Rust; {}",
VERSION.unwrap_or("unknown"),
name
);
if options.init {
println!("Initialising arrays on main thread");
} }
run(options);
} }
None => eprintln!("Device index({}) not available", options.device) run();
}
None => eprintln!("Device index {} not available", options.device),
} }
} }
} }

View File

@ -0,0 +1,61 @@
use crate::stream::{AllocatorType, ArrayType, RustStream, StreamData};
use core_affinity::CoreId;
pub struct SerialDevice {
pub(crate) pin: bool,
}
// single threaded version
impl<T: ArrayType, A: AllocatorType> RustStream<T> for StreamData<T, SerialDevice, A> {
fn init_arrays(&mut self) {
if self.device.pin {
core_affinity::set_for_current(
match core_affinity::get_core_ids().as_ref().map(|x| x.first()) {
Some(Some(x)) => *x,
_ => CoreId { id: 0 },
},
);
}
self.a.fill(self.init.0);
self.b.fill(self.init.1);
self.c.fill(self.init.2);
}
fn copy(&mut self) {
for i in 0..self.size {
self.c[i] = self.a[i];
}
}
fn mul(&mut self) {
for i in 0..self.size {
self.b[i] = self.scalar * self.c[i];
}
}
fn add(&mut self) {
for i in 0..self.size {
self.c[i] = self.a[i] + self.b[i];
}
}
fn triad(&mut self) {
for i in 0..self.size {
self.a[i] = self.b[i] + self.scalar * self.c[i];
}
}
fn nstream(&mut self) {
for i in 0..self.size {
self.a[i] += self.b[i] * self.scalar * self.c[i];
}
}
fn dot(&mut self) -> T {
let mut sum = T::default();
for i in 0..self.size {
sum += self.a[i] * self.b[i];
}
sum
}
}

View File

@ -0,0 +1,77 @@
use std::iter::Sum;
use rayon::prelude::*;
use rayon::ThreadPool;
use crate::stream::{AllocatorType, ArrayType, RustStream, StreamData};
pub struct RayonDevice {
pub(crate) pool: ThreadPool,
}
// Rayon version, it should be semantically equal to the single threaded version
impl<T: ArrayType + Sync + Send + Sum, A: AllocatorType + Sync + Send> RustStream<T>
for StreamData<T, RayonDevice, A>
{
fn init_arrays(&mut self) {
let init = self.init;
self.a.par_iter_mut().for_each(|v| *v = init.0);
self.b.par_iter_mut().for_each(|v| *v = init.1);
self.c.par_iter_mut().for_each(|v| *v = init.2);
}
fn copy(&mut self) {
let a = &self.a;
let c = &mut self.c;
self.device.pool.install(|| {
(*c).par_iter_mut().enumerate().for_each(|(i, c)| *c = a[i]);
});
}
fn mul(&mut self) {
let scalar = self.scalar;
let c = &self.c;
let b = &mut self.b;
self
.device
.pool
.install(|| (*b).par_iter_mut().enumerate().for_each(|(i, b)| *b = scalar * c[i]));
}
fn add(&mut self) {
let a = &self.a;
let b = &self.b;
let c = &mut self.c;
self.device.pool.install(|| (*c).par_iter_mut().enumerate().for_each(|(i, c)| *c = a[i] + b[i]))
}
fn triad(&mut self) {
let scalar = self.scalar;
let a = &mut self.a;
let b = &self.b;
let c = &self.c;
self
.device
.pool
.install(|| (*a).par_iter_mut().enumerate().for_each(|(i, a)| *a = b[i] + scalar * c[i]))
}
fn nstream(&mut self) {
let scalar = self.scalar;
let a = &mut self.a;
let b = &self.b;
let c = &self.c;
self
.device
.pool
.install(|| (*a).par_iter_mut().enumerate().for_each(|(i, a)| *a += b[i] + scalar * c[i]))
}
fn dot(&mut self) -> T {
let a = &self.a;
let b = &self.b;
self.device.pool.install(|| {
(0..self.size).into_par_iter().fold(|| T::default(), |acc, i| acc + a[i] * b[i]).sum::<T>()
})
}
}

166
rust-stream/src/stream.rs Normal file
View File

@ -0,0 +1,166 @@
use num_traits::real::Real;
use num_traits::{NumAssign, Signed};
use std::alloc::Allocator;
use std::fmt::Debug;
use std::time::{Duration, Instant};
pub trait AllocatorType: Allocator + Copy + Clone + Default + Debug {}
impl<T: Allocator + Copy + Clone + Default + Debug> AllocatorType for T {}
pub struct StreamData<T, D, A: AllocatorType> {
pub device: D,
pub size: usize,
pub scalar: T,
pub init: (T, T, T),
pub a: Vec<T, A>,
pub b: Vec<T, A>,
pub c: Vec<T, A>,
pub needs_dealloc: bool,
}
#[inline(always)]
fn timed<F: FnOnce()>(f: F) -> Duration {
let start = Instant::now();
f();
start.elapsed()
}
#[inline(always)]
fn timed_mut<T, F: FnMut() -> T>(f: &mut F) -> (Duration, T) {
let start = Instant::now();
let x = f();
(start.elapsed(), x)
}
pub struct AllTiming<T> {
pub copy: T,
pub mul: T,
pub add: T,
pub triad: T,
pub dot: T,
}
pub trait ArrayType: Real + NumAssign + Signed + Default + Debug {}
impl<T: Real + NumAssign + Signed + Default + Debug> ArrayType for T {}
impl<T: Default + Clone, D, A: AllocatorType> StreamData<T, D, A> {
pub fn new_in(
size: usize,
scalar: T,
init: (T, T, T),
device: D,
allocator: A,
malloc: bool, //
initialise: bool, //
) -> StreamData<T, D, A> {
let mk_vec = || {
if malloc {
extern crate libc;
use std::mem;
unsafe {
// we do the typical C malloc with a NULL check here
let bytes = mem::size_of::<T>() * size;
let ptr = libc::malloc(bytes as libc::size_t) as *mut T;
if ptr.is_null() {
panic!(
"Cannot allocate {} bytes in `sizeof(T) * size` (T = {}, size = {})",
bytes,
mem::size_of::<T>(),
size
);
}
let mut xs = Vec::from_raw_parts_in(ptr, size, size, allocator);
if initialise {
xs.fill(T::default());
}
xs
}
} else {
if initialise {
let mut xs = Vec::new_in(allocator);
xs.resize(size, T::default());
xs
} else {
// try not to touch the vec after allocation
let mut xs = Vec::with_capacity_in(size, allocator);
unsafe {
xs.set_len(size);
}
xs
}
}
};
StreamData {
device,
size,
scalar,
init,
a: mk_vec(),
b: mk_vec(),
c: mk_vec(),
needs_dealloc: malloc,
}
}
pub fn clean_up(self) {
if self.needs_dealloc {
unsafe {
extern crate libc;
let free_ts = move |xs: Vec<T, A>| {
// make sure we don't call dealloc for vec anymore
// XXX it's important we don't free xs.as_mut_ptr() here and use xs.into_raw_parts_with_alloc()
// as that function handles drops semantic for us
// if we free the the raw ptr directly, the compiler will still drop the vec and then segfault
let (ptr, _, _, _) = xs.into_raw_parts_with_alloc();
libc::free(ptr as *mut libc::c_void);
};
free_ts(self.a);
free_ts(self.b);
free_ts(self.c);
}
}
}
}
pub trait RustStream<T: Default> {
fn init_arrays(&mut self);
fn copy(&mut self);
fn mul(&mut self);
fn add(&mut self);
fn triad(&mut self);
fn nstream(&mut self);
fn dot(&mut self) -> T;
fn run_all(&mut self, n: usize) -> (AllTiming<Vec<Duration>>, T) {
let mut timings: AllTiming<Vec<Duration>> = AllTiming {
copy: vec![Duration::default(); n],
mul: vec![Duration::default(); n],
add: vec![Duration::default(); n],
triad: vec![Duration::default(); n],
dot: vec![Duration::default(); n],
};
let mut last_sum = T::default();
for i in 0..n {
timings.copy[i] = timed(|| self.copy());
timings.mul[i] = timed(|| self.mul());
timings.add[i] = timed(|| self.add());
timings.triad[i] = timed(|| self.triad());
let (dot, sum) = timed_mut(&mut || self.dot());
timings.dot[i] = dot;
last_sum = sum;
}
(timings, last_sum)
}
fn run_triad(&mut self, n: usize) -> Duration {
timed(|| {
for _ in 0..n {
self.triad();
}
})
}
fn run_nstream(&mut self, n: usize) -> Vec<Duration> {
(0..n).map(|_| timed(|| self.nstream())).collect::<Vec<_>>()
}
}