diff --git a/README.md b/README.md index f9a4415..6c254f0 100644 --- a/README.md +++ b/README.md @@ -23,6 +23,8 @@ Currently implemented are: This code was previously called GPU-STREAM. +This project also contains implementations in alternative languages with different build systems: +* Scala - [scala-stream](./scala-stream) How is this different to STREAM? -------------------------------- diff --git a/scala-stream/.bsp/sbt.json b/scala-stream/.bsp/sbt.json new file mode 100644 index 0000000..2e1edb1 --- /dev/null +++ b/scala-stream/.bsp/sbt.json @@ -0,0 +1 @@ +{"name":"sbt","version":"1.5.2","bspVersion":"2.0.0-M5","languages":["scala"],"argv":["/usr/lib/jvm/java-11-openjdk-11.0.11.0.9-2.fc33.x86_64/bin/java","-Xms100m","-Xmx100m","-classpath","/home/tom/.local/share/JetBrains/Toolbox/apps/IDEA-U/ch-0/211.7142.45.plugins/Scala/launcher/sbt-launch.jar","xsbt.boot.Boot","-bsp","--sbt-launch-jar=/home/tom/.local/share/JetBrains/Toolbox/apps/IDEA-U/ch-0/211.7142.45.plugins/Scala/launcher/sbt-launch.jar"]} \ No newline at end of file diff --git a/scala-stream/.gitignore b/scala-stream/.gitignore new file mode 100644 index 0000000..2f7896d --- /dev/null +++ b/scala-stream/.gitignore @@ -0,0 +1 @@ +target/ diff --git a/scala-stream/.jvmopts b/scala-stream/.jvmopts new file mode 100644 index 0000000..c1ef295 --- /dev/null +++ b/scala-stream/.jvmopts @@ -0,0 +1,2 @@ +-Xmx4096m +-Xss4m \ No newline at end of file diff --git a/scala-stream/.scalafmt.conf b/scala-stream/.scalafmt.conf new file mode 100644 index 0000000..8c7d0c8 --- /dev/null +++ b/scala-stream/.scalafmt.conf @@ -0,0 +1,34 @@ +version = "3.0.0-RC2" +runner.dialect = scala3 + +style = defaultWithAlign + +maxColumn = 100 + +align.preset = more + +rewrite.rules = [ + AvoidInfix + RedundantBraces + RedundantParens + AsciiSortImports + PreferCurlyFors +] + +rewrite.neverInfix.excludeFilters = [until + to + by + eq + ne + "should.*" + "contain.*" + "must.*" + in + be + taggedAs + thrownBy + synchronized + have + when + size + theSameElementsAs] \ No newline at end of file diff --git a/scala-stream/README.md b/scala-stream/README.md new file mode 100644 index 0000000..bf0e3f4 --- /dev/null +++ b/scala-stream/README.md @@ -0,0 +1,102 @@ +ScalaStream +=========== + +This is an implementation of BabelStream +in [Scala 3](https://docs.scala-lang.org/scala3/new-in-scala3.html) on the JVM. In theory, this +implementation also covers Java. Scala and Java, like any other programming language, has its own +ecosystem of library supported parallel programming frameworks, we currently implement the +following: + +* Parallel streams (introduced in Java 8) - `src/main/scala/scalastream/J8SStream.scala` +* [Scala Parallel Collections](https://github.com/scala/scala-parallel-collections) + - `src/main/scala/scalastream/ParStream.scala` + +As the benchmark is relatively simple, we also implement some baselines: + +* Single threaded Scala `for` (i.e `foreach` sugar) - `src/main/scala/scalastream/PlainStream.scala` +* Manually parallelism with Java executors - `src/main/scala/scalastream/ThreadedStream.scala` + +### Performance considerations + +As Scala 3 defaults to Scala 2.13's standard library, we roll our own `Fractional` typeclass with +liberal use of inlining and specialisation. This is motivated by 2.13 stdlib's lack of +specialisation for primitives types on the default `Fractional` and `Numeric` typeclasses. + +The use of [Spire](https://github.com/typelevel/spire) to mitigate this was attempted, however, due +to its use of Scala 2 macros, it currently doesn't compile with Scala 3. + +### Build & Run + +Prerequisites + +* JDK >= 8 on any of its supported platform; known working implementations: + - OpenJDK + distributions ([Amazon Corretto](https://docs.aws.amazon.com/corretto/latest/corretto-11-ug/downloads-list.html) + , [Azul](https://www.azul.com/downloads/?version=java-11-lts&package=jdk) + , [AdoptOpenJDK](https://adoptopenjdk.net/), etc) + - Oracle Graal CE/EE 8+ + +To run the benchmark, first create a binary: + +```shell +> ./sbt assembly +``` + +The binary will be located at `./target/scala-3.0.0/scala-stream.jar`. Run it with: + +```shell +> java -version +openjdk version "11.0.11" 2021-04-20 +OpenJDK Runtime Environment 18.9 (build 11.0.11+9) +OpenJDK 64-Bit Server VM 18.9 (build 11.0.11+9, mixed mode, sharing) +> java -jar target/scala-3.0.0/scala-stream.jar --help + +``` + +For best results, benchmark with the following JVM flags: + +``` +-XX:-UseOnStackReplacement # disable OSR, not useful for this benchmark as we are measuring peak performance +-XX:-TieredCompilation # disable C1, go straight to C2 +-XX:ReservedCodeCacheSize=512m # don't flush compiled code out of cache at any point +``` + +Worked example: + +```shell +> java -XX:-UseOnStackReplacement -XX:-TieredCompilation -XX:ReservedCodeCacheSize=512m -jar target/scala-3.0.0/scala-stream.jar + +BabelStream +Version: 3.4.0 +Implementation: Scala Parallel Collections; Scala (Java 11.0.11; Red Hat, Inc.; home=/usr/lib/jvm/java-11-openjdk-11.0.11.0.9-2.fc33.x86_64) +Running kernels 100 times +Precision: double +Array size: 268.4 MB (=0.3 GB) +Total size: 805.3 MB (=0.8 GB) +Function MBytes/sec Min (sec) Max Average +Copy 4087.077 0.13136 0.24896 0.15480 +Mul 2934.709 0.18294 0.28706 0.21627 +Add 3016.342 0.26698 0.39835 0.31119 +Triad 3016.496 0.26697 0.37612 0.31040 +Dot 2216.096 0.24226 0.41235 0.28264 + +``` + +### Graal Native Image + +The port has partial support for Graal Native Image, to generate one, run: + +```shell +> ./sbt nativeImage +``` + +The ELF binary will be located at `./target/native-image/scala-stream`, relocation should work on +the same architecture the binary is built on. + +There's an ongoing bug with Scala 3 's use of `lazy val`s where the program crashes at declaration +site. Currently, Scala Parallel Collections uses this feature internally, so selecting this device +will crash at runtime. + +The bug originates from the use of `Unsafe` in `lazy val` for thready safety guarantees. It seems +that Graal only supports limited uses of this JVM implementation detail and Scala 3 happens to be on +the unsupported side. \ No newline at end of file diff --git a/scala-stream/build.sbt b/scala-stream/build.sbt new file mode 100644 index 0000000..4194acb --- /dev/null +++ b/scala-stream/build.sbt @@ -0,0 +1,29 @@ +lazy val mainCls = Some("scalastream.App") + +lazy val root = (project in file(".")) + .enablePlugins(NativeImagePlugin) + .settings( + scalaVersion := "3.0.0", + version := "3.4.0", + organization := "uk.ac.bristol.uob-hpc", + organizationName := "University of Bristol", + Compile / mainClass := mainCls, + assembly / mainClass := mainCls, + scalacOptions ~= filterConsoleScalacOptions, + assembly / assemblyJarName := "scala-stream.jar", + nativeImageOptions := Seq( + "--no-fallback", + "-H:ReflectionConfigurationFiles=../../reflect-config.json" + ), + nativeImageVersion := "21.1.0", + (Global / excludeLintKeys) += nativeImageVersion, + name := "scala-stream", + libraryDependencies ++= Seq( + // Lazy val implementation in Scala 3 triggers an exception in nativeImage, use 2_13 for arg parsing for now otherwise we can't get to the benchmarking part + ("com.github.scopt" %% "scopt" % "4.0.1").cross(CrossVersion.for3Use2_13), + // par also uses lazy val at some point, so it doesn't work in nativeImage + "org.scala-lang.modules" %% "scala-parallel-collections" % "1.0.3", + "net.openhft" % "affinity" % "3.21ea1", + "org.slf4j" % "slf4j-simple" % "1.7.30" // for affinity + ) + ) diff --git a/scala-stream/project/build.properties b/scala-stream/project/build.properties new file mode 100644 index 0000000..19479ba --- /dev/null +++ b/scala-stream/project/build.properties @@ -0,0 +1 @@ +sbt.version=1.5.2 diff --git a/scala-stream/project/plugins.sbt b/scala-stream/project/plugins.sbt new file mode 100644 index 0000000..2c82902 --- /dev/null +++ b/scala-stream/project/plugins.sbt @@ -0,0 +1,6 @@ +addSbtPlugin("com.timushev.sbt" % "sbt-updates" % "0.5.3") +addSbtPlugin("io.github.davidgregory084" % "sbt-tpolecat" % "0.1.17") +addSbtPlugin("org.scalameta" % "sbt-native-image" % "0.3.0") +addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.15.0") +addSbtPlugin("ch.epfl.scala" % "sbt-scalafix" % "0.9.27") +addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.2") diff --git a/scala-stream/reflect-config.json b/scala-stream/reflect-config.json new file mode 100644 index 0000000..9e8b089 --- /dev/null +++ b/scala-stream/reflect-config.json @@ -0,0 +1,11 @@ +[ + { + "name": "sun.misc.Unsafe", + "fields": [ + { + "name": "theUnsafe", + "allowUnsafeAccess": true + } + ] + } +] \ No newline at end of file diff --git a/scala-stream/sbt b/scala-stream/sbt new file mode 100755 index 0000000..efdfda6 --- /dev/null +++ b/scala-stream/sbt @@ -0,0 +1,3 @@ +#!/usr/bin/env bash + +./sbt-dist/bin/sbt "$@" \ No newline at end of file diff --git a/scala-stream/sbt-dist/bin/java9-rt-export.jar b/scala-stream/sbt-dist/bin/java9-rt-export.jar new file mode 100644 index 0000000..cbabfb0 Binary files /dev/null and b/scala-stream/sbt-dist/bin/java9-rt-export.jar differ diff --git a/scala-stream/sbt-dist/bin/sbt b/scala-stream/sbt-dist/bin/sbt new file mode 100755 index 0000000..cca77be --- /dev/null +++ b/scala-stream/sbt-dist/bin/sbt @@ -0,0 +1,177 @@ +#!/usr/bin/env bash + + +### ------------------------------- ### +### Helper methods for BASH scripts ### +### ------------------------------- ### + +realpath () { +( + TARGET_FILE="$1" + FIX_CYGPATH="$2" + + cd "$(dirname "$TARGET_FILE")" + TARGET_FILE=$(basename "$TARGET_FILE") + + COUNT=0 + while [ -L "$TARGET_FILE" -a $COUNT -lt 100 ] + do + TARGET_FILE=$(readlink "$TARGET_FILE") + cd "$(dirname "$TARGET_FILE")" + TARGET_FILE=$(basename "$TARGET_FILE") + COUNT=$(($COUNT + 1)) + done + + # make sure we grab the actual windows path, instead of cygwin's path. + if [[ "x$FIX_CYGPATH" != "x" ]]; then + echo "$(cygwinpath "$(pwd -P)/$TARGET_FILE")" + else + echo "$(pwd -P)/$TARGET_FILE" + fi +) +} + + +# Uses uname to detect if we're in the odd cygwin environment. +is_cygwin() { + local os=$(uname -s) + case "$os" in + CYGWIN*) return 0 ;; + MINGW*) return 0 ;; + MSYS*) return 0 ;; + *) return 1 ;; + esac +} + +# TODO - Use nicer bash-isms here. +CYGWIN_FLAG=$(if is_cygwin; then echo true; else echo false; fi) + + +# This can fix cygwin style /cygdrive paths so we get the +# windows style paths. +cygwinpath() { + local file="$1" + if [[ "$CYGWIN_FLAG" == "true" ]]; then + echo $(cygpath -w $file) + else + echo $file + fi +} + +. "$(dirname "$(realpath "$0")")/sbt-launch-lib.bash" + + +declare -r noshare_opts="-Dsbt.global.base=project/.sbtboot -Dsbt.boot.directory=project/.boot -Dsbt.ivy.home=project/.ivy" +declare -r sbt_opts_file=".sbtopts" +declare -r etc_sbt_opts_file="/etc/sbt/sbtopts" +declare -r dist_sbt_opts_file="${sbt_home}/conf/sbtopts" +declare -r win_sbt_opts_file="${sbt_home}/conf/sbtconfig.txt" + +usage() { + cat < path to global settings/plugins directory (default: ~/.sbt) + -sbt-boot path to shared boot directory (default: ~/.sbt/boot in 0.11 series) + -ivy path to local Ivy repository (default: ~/.ivy2) + -mem set memory options (default: $sbt_default_mem, which is $(get_mem_opts)) + -no-share use all local caches; no sharing + -no-global uses global caches, but does not use global ~/.sbt directory. + -jvm-debug Turn on JVM debugging, open at the given port. + -batch Disable interactive mode + + # sbt version (default: from project/build.properties if present, else latest release) + -sbt-version use the specified version of sbt + -sbt-jar use the specified jar as the sbt launcher + -sbt-rc use an RC version of sbt + -sbt-snapshot use a snapshot version of sbt + + # java version (default: java from PATH, currently $(java -version 2>&1 | grep version)) + -java-home alternate JAVA_HOME + + # jvm options and output control + JAVA_OPTS environment variable, if unset uses "$java_opts" + .jvmopts if this file exists in the current directory, its contents + are appended to JAVA_OPTS + SBT_OPTS environment variable, if unset uses "$default_sbt_opts" + .sbtopts if this file exists in the current directory, its contents + are prepended to the runner args + /etc/sbt/sbtopts if this file exists, it is prepended to the runner args + -Dkey=val pass -Dkey=val directly to the java runtime + -J-X pass option -X directly to the java runtime + (-J is stripped) + -S-X add -X to sbt's scalacOptions (-S is stripped) + +In the case of duplicated or conflicting options, the order above +shows precedence: JAVA_OPTS lowest, command line options highest. +EOM +} + + + +process_my_args () { + while [[ $# -gt 0 ]]; do + case "$1" in + -no-colors) addJava "-Dsbt.log.noformat=true" && shift ;; + -no-share) addJava "$noshare_opts" && shift ;; + -no-global) addJava "-Dsbt.global.base=$(pwd)/project/.sbtboot" && shift ;; + -sbt-boot) require_arg path "$1" "$2" && addJava "-Dsbt.boot.directory=$2" && shift 2 ;; + -sbt-dir) require_arg path "$1" "$2" && addJava "-Dsbt.global.base=$2" && shift 2 ;; + -debug-inc) addJava "-Dxsbt.inc.debug=true" && shift ;; + -batch) exec + link=$(expr "$ls" : '.*-> \(.*\)$') + if expr "$link" : '/.*' > /dev/null; then + SCRIPT="$link" + else + SCRIPT=$(dirname "$SCRIPT")/"$link" + fi +done +declare -r sbt_bin_dir="$(dirname "$SCRIPT")" +declare -r sbt_home="$(dirname "$sbt_bin_dir")" + +echoerr () { + echo 1>&2 "$@" +} +vlog () { + [[ $verbose || $debug ]] && echoerr "$@" +} +dlog () { + [[ $debug ]] && echoerr "$@" +} + +jar_file () { + echo "$(cygwinpath "${sbt_home}/bin/sbt-launch.jar")" +} + +acquire_sbt_jar () { + sbt_jar="$(jar_file)" + + if [[ ! -f "$sbt_jar" ]]; then + echoerr "Could not find launcher jar: $sbt_jar" + exit 2 + fi +} + +rt_export_file () { + echo "${sbt_bin_dir}/java9-rt-export.jar" +} + +execRunner () { + # print the arguments one to a line, quoting any containing spaces + [[ $verbose || $debug ]] && echo "# Executing command line:" && { + for arg; do + if printf "%s\n" "$arg" | grep -q ' '; then + printf "\"%s\"\n" "$arg" + else + printf "%s\n" "$arg" + fi + done + echo "" + } + + # THis used to be exec, but we loose the ability to re-hook stty then + # for cygwin... Maybe we should flag the feature here... + "$@" +} + +addJava () { + dlog "[addJava] arg = '$1'" + java_args=( "${java_args[@]}" "$1" ) +} +addSbt () { + dlog "[addSbt] arg = '$1'" + sbt_commands=( "${sbt_commands[@]}" "$1" ) +} +addResidual () { + dlog "[residual] arg = '$1'" + residual_args=( "${residual_args[@]}" "$1" ) +} +addDebugger () { + addJava "-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=$1" +} + +get_mem_opts () { + # if we detect any of these settings in ${JAVA_OPTS} or ${JAVA_TOOL_OPTIONS} we need to NOT output our settings. + # The reason is the Xms/Xmx, if they don't line up, cause errors. + if [[ "${JAVA_OPTS}" == *-Xmx* ]] || [[ "${JAVA_OPTS}" == *-Xms* ]] || [[ "${JAVA_OPTS}" == *-XX:MaxPermSize* ]] || [[ "${JAVA_OPTS}" == *-XX:MaxMetaspaceSize* ]] || [[ "${JAVA_OPTS}" == *-XX:ReservedCodeCacheSize* ]]; then + echo "" + elif [[ "${JAVA_TOOL_OPTIONS}" == *-Xmx* ]] || [[ "${JAVA_TOOL_OPTIONS}" == *-Xms* ]] || [[ "${JAVA_TOOL_OPTIONS}" == *-XX:MaxPermSize* ]] || [[ "${JAVA_TOOL_OPTIONS}" == *-XX:MaxMetaspaceSize* ]] || [[ "${JAVA_TOOL_OPTIONS}" == *-XX:ReservedCodeCacheSize* ]]; then + echo "" + elif [[ "${SBT_OPTS}" == *-Xmx* ]] || [[ "${SBT_OPTS}" == *-Xms* ]] || [[ "${SBT_OPTS}" == *-XX:MaxPermSize* ]] || [[ "${SBT_OPTS}" == *-XX:MaxMetaspaceSize* ]] || [[ "${SBT_OPTS}" == *-XX:ReservedCodeCacheSize* ]]; then + echo "" + else + # a ham-fisted attempt to move some memory settings in concert + # so they need not be messed around with individually. + local mem=${1:-$sbt_default_mem} + local codecache=$(( $mem / 8 )) + (( $codecache > 128 )) || codecache=128 + (( $codecache < 512 )) || codecache=512 + local class_metadata_size=$(( $codecache * 2 )) + if [[ -z $java_version ]]; then + java_version=$(jdk_version) + fi + local class_metadata_opt=$((( $java_version < 8 )) && echo "MaxPermSize" || echo "MaxMetaspaceSize") + + local arg_xms=$([[ "${java_args[@]}" == *-Xms* ]] && echo "" || echo "-Xms${mem}m") + local arg_xmx=$([[ "${java_args[@]}" == *-Xmx* ]] && echo "" || echo "-Xmx${mem}m") + local arg_rccs=$([[ "${java_args[@]}" == *-XX:ReservedCodeCacheSize* ]] && echo "" || echo "-XX:ReservedCodeCacheSize=${codecache}m") + local arg_meta=$([[ "${java_args[@]}" == *-XX:${class_metadata_opt}* && ! (( $java_version < 8 )) ]] && echo "" || echo "-XX:${class_metadata_opt}=${class_metadata_size}m") + + echo "${arg_xms} ${arg_xmx} ${arg_rccs} ${arg_meta}" + fi +} + +get_gc_opts () { + local older_than_9=$(( $java_version < 9 )) + + if [[ "$older_than_9" == "1" ]]; then + # don't need to worry about gc + echo "" + elif [[ "${JAVA_OPTS}" =~ Use.*GC ]] || [[ "${JAVA_TOOL_OPTIONS}" =~ Use.*GC ]] || [[ "${SBT_OPTS}" =~ Use.*GC ]] ; then + # GC arg has been passed in - don't change + echo "" + else + # Java 9+ so revert to old + echo "-XX:+UseParallelGC" + fi +} + +require_arg () { + local type="$1" + local opt="$2" + local arg="$3" + if [[ -z "$arg" ]] || [[ "${arg:0:1}" == "-" ]]; then + echo "$opt requires <$type> argument" + exit 1 + fi +} + +is_function_defined() { + declare -f "$1" > /dev/null +} + +# parses JDK version from the -version output line. +# 8 for 1.8.0_nn, 9 for 9-ea etc, and "no_java" for undetected +jdk_version() { + local result + local lines=$("$java_cmd" -Xms32M -Xmx32M -version 2>&1 | tr '\r' '\n') + local IFS=$'\n' + for line in $lines; do + if [[ (-z $result) && ($line = *"version \""*) ]] + then + local ver=$(echo $line | sed -e 's/.*version "\(.*\)"\(.*\)/\1/; 1q') + # on macOS sed doesn't support '?' + if [[ $ver = "1."* ]] + then + result=$(echo $ver | sed -e 's/1\.\([0-9]*\)\(.*\)/\1/; 1q') + else + result=$(echo $ver | sed -e 's/\([0-9]*\)\(.*\)/\1/; 1q') + fi + fi + done + if [[ -z $result ]] + then + result=no_java + fi + echo "$result" +} + +process_args () { + while [[ $# -gt 0 ]]; do + case "$1" in + -h|-help) usage; exit 1 ;; + -v|-verbose) verbose=1 && shift ;; + -d|-debug) debug=1 && addSbt "-debug" && shift ;; + + -ivy) require_arg path "$1" "$2" && addJava "-Dsbt.ivy.home=$2" && shift 2 ;; + -mem) require_arg integer "$1" "$2" && sbt_mem="$2" && shift 2 ;; + -jvm-debug) require_arg port "$1" "$2" && addDebugger $2 && shift 2 ;; + -batch) exec /dev/null 2>&1 && { + mkdir -p "$target_preloaded" + rsync -a --ignore-existing "$source_preloaded" "$target_preloaded" + } + } + } +} + +# Detect that we have java installed. +checkJava() { + local required_version="$1" + # Now check to see if it's a good enough version + local good_enough="$(expr $java_version ">=" $required_version)" + if [[ "$java_version" == "" ]]; then + echo + echo "No Java Development Kit (JDK) installation was detected." + echo Please go to http://www.oracle.com/technetwork/java/javase/downloads/ and download. + echo + exit 1 + elif [[ "$good_enough" != "1" ]]; then + echo + echo "The Java Development Kit (JDK) installation you have is not up to date." + echo $script_name requires at least version $required_version+, you have + echo version $java_version + echo + echo Please go to http://www.oracle.com/technetwork/java/javase/downloads/ and download + echo a valid JDK and install before running $script_name. + echo + exit 1 + fi +} + +copyRt() { + local at_least_9="$(expr $java_version ">=" 9)" + if [[ "$at_least_9" == "1" ]]; then + rtexport=$(rt_export_file) + # The grep for java9-rt-ext- matches the filename prefix printed in Export.java + java9_ext=$("$java_cmd" ${JAVA_OPTS} ${SBT_OPTS:-$default_sbt_opts} ${java_args[@]} \ + -jar "$rtexport" --rt-ext-dir | grep java9-rt-ext-) + java9_rt=$(echo "$java9_ext/rt.jar") + vlog "[copyRt] java9_rt = '$java9_rt'" + if [[ ! -f "$java9_rt" ]]; then + echo Copying runtime jar. + mkdir -p "$java9_ext" + execRunner "$java_cmd" \ + ${JAVA_OPTS} \ + ${SBT_OPTS:-$default_sbt_opts} \ + ${java_args[@]} \ + -jar "$rtexport" \ + "${java9_rt}" + fi + addJava "-Dscala.ext.dirs=${java9_ext}" + fi +} + +run() { + # process the combined args, then reset "$@" to the residuals + process_args "$@" + set -- "${residual_args[@]}" + argumentCount=$# + + # Copy preloaded repo to user's preloaded directory + syncPreloaded + + # no jar? download it. + [[ -f "$sbt_jar" ]] || acquire_sbt_jar "$sbt_version" || { + # still no jar? uh-oh. + echo "Download failed. Obtain the sbt-launch.jar manually and place it at $sbt_jar" + exit 1 + } + + # TODO - java check should be configurable... + checkJava "6" + + # Java 9 support + copyRt + + #If we're in cygwin, we should use the windows config, and terminal hacks + if [[ "$CYGWIN_FLAG" == "true" ]]; then + stty -icanon min 1 -echo > /dev/null 2>&1 + addJava "-Djline.terminal=jline.UnixTerminal" + addJava "-Dsbt.cygwin=true" + fi + + # run sbt + execRunner "$java_cmd" \ + $(get_mem_opts $sbt_mem) \ + $(get_gc_opts) \ + ${JAVA_OPTS} \ + ${SBT_OPTS:-$default_sbt_opts} \ + ${java_args[@]} \ + -jar "$sbt_jar" \ + "${sbt_commands[@]}" \ + "${residual_args[@]}" + + exit_code=$? + + # Clean up the terminal from cygwin hacks. + if [[ "$CYGWIN_FLAG" == "true" ]]; then + stty icanon echo > /dev/null 2>&1 + fi + exit $exit_code +} diff --git a/scala-stream/sbt-dist/bin/sbt-launch.jar b/scala-stream/sbt-dist/bin/sbt-launch.jar new file mode 100644 index 0000000..26ab884 Binary files /dev/null and b/scala-stream/sbt-dist/bin/sbt-launch.jar differ diff --git a/scala-stream/sbt-dist/bin/sbt.bat b/scala-stream/sbt-dist/bin/sbt.bat new file mode 100644 index 0000000..1827961 --- /dev/null +++ b/scala-stream/sbt-dist/bin/sbt.bat @@ -0,0 +1,212 @@ +@REM SBT launcher script +@REM +@REM Environment: +@REM JAVA_HOME - location of a JDK home dir (mandatory) +@REM SBT_OPTS - JVM options (optional) +@REM Configuration: +@REM sbtconfig.txt found in the SBT_HOME. + +@REM ZOMG! We need delayed expansion to build up CFG_OPTS later +@setlocal enabledelayedexpansion + +@echo off +set SBT_HOME=%~dp0 +set SBT_ARGS= + +rem FIRST we load the config file of extra options. +set FN=%SBT_HOME%\..\conf\sbtconfig.txt +set CFG_OPTS= +FOR /F "tokens=* eol=# usebackq delims=" %%i IN ("%FN%") DO ( + set DO_NOT_REUSE_ME=%%i + rem ZOMG (Part #2) WE use !! here to delay the expansion of + rem CFG_OPTS, otherwise it remains "" for this loop. + set CFG_OPTS=!CFG_OPTS! !DO_NOT_REUSE_ME! +) + +rem poor man's jenv (which is not available on Windows) +IF DEFINED JAVA_HOMES ( + IF EXIST .java-version FOR /F %%A IN (.java-version) DO ( + SET JAVA_HOME=%JAVA_HOMES%\%%A + SET JDK_HOME=%JAVA_HOMES%\%%A + ) +) +rem must set PATH or wrong javac is used for java projects +IF DEFINED JAVA_HOME SET "PATH=%JAVA_HOME%\bin;%PATH%" + +rem users can set JAVA_OPTS via .jvmopts (sbt-extras style) +IF EXIST .jvmopts FOR /F %%A IN (.jvmopts) DO ( + SET _jvmopts_line=%%A + IF NOT "!_jvmopts_line:~0,1!"=="#" ( + SET JAVA_OPTS=%%A !JAVA_OPTS! + ) +) +rem We use the value of the JAVACMD environment variable if defined +set _JAVACMD=%JAVACMD% + +if "%_JAVACMD%"=="" ( + if not "%JAVA_HOME%"=="" ( + if exist "%JAVA_HOME%\bin\java.exe" set "_JAVACMD=%JAVA_HOME%\bin\java.exe" + ) +) + +if "%_JAVACMD%"=="" set _JAVACMD=java + +rem We use the value of the JAVA_OPTS environment variable if defined, rather than the config. +set _JAVA_OPTS=%JAVA_OPTS% +if "%_JAVA_OPTS%"=="" set _JAVA_OPTS=%CFG_OPTS% + +set INIT_SBT_VERSION=1.2.8 + +:args_loop +if "%~1" == "" goto args_end + +if "%~1" == "-jvm-debug" ( + set JVM_DEBUG=true + set /a JVM_DEBUG_PORT=5005 2>nul >nul +) else if "!JVM_DEBUG!" == "true" ( + set /a JVM_DEBUG_PORT=%1 2>nul >nul + if not "%~1" == "!JVM_DEBUG_PORT!" ( + set SBT_ARGS=!SBT_ARGS! %1 + ) +) else if /I "%~1" == "new" ( + set sbt_new=true + set SBT_ARGS=!SBT_ARGS! %1 +) else ( + set SBT_ARGS=!SBT_ARGS! %1 +) + +shift +goto args_loop +:args_end + +rem Confirm a user's intent if the current directory does not look like an sbt +rem top-level directory and the "new" command was not given. +if not exist build.sbt ( + if not exist project\ ( + if not defined sbt_new ( + echo [warn] Neither build.sbt nor a 'project' directory in the current directory: %CD% + setlocal +:confirm + echo c^) continue + echo q^) quit + + set /P reply=?^ + if /I "!reply!" == "c" ( + goto confirm_end + ) else if /I "!reply!" == "q" ( + exit /B 1 + ) + + goto confirm +:confirm_end + endlocal + ) + ) +) + +call :process + +call :checkjava + +call :copyrt + +if defined JVM_DEBUG_PORT ( + set _JAVA_OPTS=!_JAVA_OPTS! -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=!JVM_DEBUG_PORT! +) + +call :sync_preloaded + +call :run %SBT_ARGS% + +if ERRORLEVEL 1 goto error +goto end + +:run + +"%_JAVACMD%" %_JAVA_OPTS% %SBT_OPTS% -cp "%SBT_HOME%sbt-launch.jar" xsbt.boot.Boot %* +goto :eof + +:process +rem Parses x out of 1.x; for example 8 out of java version 1.8.0_xx +rem Otherwise, parses the major version; 9 out of java version 9-ea +set JAVA_VERSION=0 +for /f "tokens=3" %%g in ('"%_JAVACMD%" -Xms32M -Xmx32M -version 2^>^&1 ^| findstr /i version') do ( + set JAVA_VERSION=%%g +) +set JAVA_VERSION=%JAVA_VERSION:"=% +for /f "delims=.-_ tokens=1-2" %%v in ("%JAVA_VERSION%") do ( + if /I "%%v" EQU "1" ( + set JAVA_VERSION=%%w + ) else ( + set JAVA_VERSION=%%v + ) +) +exit /B 0 + +:checkjava +set required_version=6 +if /I %JAVA_VERSION% GEQ %required_version% ( + exit /B 0 +) +echo. +echo The Java Development Kit (JDK) installation you have is not up to date. +echo sbt requires at least version %required_version%+, you have +echo version %JAVA_VERSION% +echo. +echo Please go to http://www.oracle.com/technetwork/java/javase/downloads/ and download +echo a valid JDK and install before running sbt. +echo. +exit /B 1 + +:copyrt +if /I %JAVA_VERSION% GEQ 9 ( + set rtexport=!SBT_HOME!java9-rt-export.jar + + "%_JAVACMD%" %_JAVA_OPTS% %SBT_OPTS% -jar "!rtexport!" --rt-ext-dir > "%TEMP%.\rtext.txt" + set /p java9_ext= < "%TEMP%.\rtext.txt" + set java9_rt=!java9_ext!\rt.jar + + if not exist "!java9_rt!" ( + mkdir "!java9_ext!" + "%_JAVACMD%" %_JAVA_OPTS% %SBT_OPTS% -jar "!rtexport!" "!java9_rt!" + ) + set _JAVA_OPTS=!_JAVA_OPTS! -Dscala.ext.dirs="!java9_ext!" + + rem check to see if a GC has been set in the opts + echo !_JAVA_OPTS! | findstr /r "Use.*GC" >nul + if ERRORLEVEL 1 ( + rem don't have a GC set - revert to old GC + set _JAVA_OPTS=!_JAVA_OPTS! -XX:+UseParallelGC + ) +) +exit /B 0 + +:sync_preloaded +if "%INIT_SBT_VERSION%"=="" ( + rem FIXME: better %INIT_SBT_VERSION% detection + FOR /F "tokens=* USEBACKQ" %%F IN (`dir /b "%SBT_HOME%\..\lib\local-preloaded\org.scala-sbt\sbt" /B`) DO ( + SET INIT_SBT_VERSION=%%F + ) +) +set PRELOAD_SBT_JAR="%UserProfile%\.sbt\preloaded\org.scala-sbt\sbt\%INIT_SBT_VERSION%\jars\sbt.jar" +if /I %JAVA_VERSION% GEQ 8 ( + where robocopy >nul 2>nul + if %ERRORLEVEL% equ 0 ( + REM echo %PRELOAD_SBT_JAR% + if not exist %PRELOAD_SBT_JAR% ( + if exist "%SBT_HOME%\..\lib\local-preloaded\" ( + echo "about to robocopy" + robocopy "%SBT_HOME%\..\lib\local-preloaded" "%UserProfile%\.sbt\preloaded" /E + ) + ) + ) +) +exit /B 0 + +:error +@endlocal +exit /B 1 + +:end +@endlocal +exit /B 0 diff --git a/scala-stream/sbt-dist/conf/sbtconfig.txt b/scala-stream/sbt-dist/conf/sbtconfig.txt new file mode 100644 index 0000000..a4da43e --- /dev/null +++ b/scala-stream/sbt-dist/conf/sbtconfig.txt @@ -0,0 +1,14 @@ +# Set the java args to high + +-Xmx512M + +-XX:MaxPermSize=256m + +-XX:ReservedCodeCacheSize=128m + + + +# Set the extra SBT options + +-Dsbt.log.format=true + diff --git a/scala-stream/sbt-dist/conf/sbtopts b/scala-stream/sbt-dist/conf/sbtopts new file mode 100644 index 0000000..f018465 --- /dev/null +++ b/scala-stream/sbt-dist/conf/sbtopts @@ -0,0 +1,49 @@ +# ------------------------------------------------ # +# The SBT Configuration file. # +# ------------------------------------------------ # + + +# Disable ANSI color codes +# +#-no-colors + +# Starts sbt even if the current directory contains no sbt project. +# +-sbt-create + +# Path to global settings/plugins directory (default: ~/.sbt) +# +#-sbt-dir /etc/sbt + +# Path to shared boot directory (default: ~/.sbt/boot in 0.11 series) +# +#-sbt-boot ~/.sbt/boot + +# Path to local Ivy repository (default: ~/.ivy2) +# +#-ivy ~/.ivy2 + +# set memory options +# +#-mem + +# Use local caches for projects, no sharing. +# +#-no-share + +# Put SBT in offline mode. +# +#-offline + +# Sets the SBT version to use. +#-sbt-version 0.11.3 + +# Scala version (default: latest release) +# +#-scala-home +#-scala-version + +# java version (default: java from PATH, currently $(java -version |& grep version)) +# +#-java-home + diff --git a/scala-stream/src/main/scala/scalastream/J8SStream.scala b/scala-stream/src/main/scala/scalastream/J8SStream.scala new file mode 100644 index 0000000..ba509a5 --- /dev/null +++ b/scala-stream/src/main/scala/scalastream/J8SStream.scala @@ -0,0 +1,44 @@ +package scalastream + +import scalastream.App.{Config, Data} + +import scala.collection.immutable.ArraySeq +import scala.reflect.{ClassTag, classTag} + +class J8SStream[@specialized(Float, Double) A: Fractional: ClassTag](val config: Config[A]) + extends ScalaStream[A]: + + private var a: Array[A] = _ + private var b: Array[A] = _ + private var c: Array[A] = _ + private val scalar: A = config.scalar + + inline private def stream = + java.util.stream.IntStream.range(0, config.options.arraysize).parallel() + + override inline def initArrays(): Unit = + a = Array.ofDim(config.options.arraysize) + b = Array.ofDim(config.options.arraysize) + c = Array.ofDim(config.options.arraysize) + stream.forEach { i => + a(i) = config.init._1 + b(i) = config.init._2 + c(i) = config.init._3 + } + + override inline def copy(): Unit = stream.forEach(i => c(i) = a(i)) + override inline def mul(): Unit = stream.forEach(i => b(i) = scalar * c(i)) + override inline def add(): Unit = stream.forEach(i => c(i) = a(i) + b(i)) + override inline def triad(): Unit = stream.forEach(i => a(i) = b(i) + scalar * c(i)) + override inline def nstream(): Unit = stream.forEach(i => a(i) = b(i) * scalar * c(i)) + override inline def dot(): A = + // horrible special-case for double, there isn't a mapToFloat so we give up on that + val cls = classTag[A].runtimeClass + if java.lang.Double.TYPE == cls then + stream + .mapToDouble(i => (a(i) * b(i)).asInstanceOf[Double]) + .reduce(0, (l: Double, r: Double) => l + r) + .asInstanceOf[A] + else stream.mapToObj[A](i => a(i) * b(i)).reduce(0.fractional, (l: A, r: A) => l + r) + + override inline def data(): Data[A] = Data(a.to(ArraySeq), b.to(ArraySeq), c.to(ArraySeq)) diff --git a/scala-stream/src/main/scala/scalastream/ParStream.scala b/scala-stream/src/main/scala/scalastream/ParStream.scala new file mode 100644 index 0000000..bb146a2 --- /dev/null +++ b/scala-stream/src/main/scala/scalastream/ParStream.scala @@ -0,0 +1,36 @@ +package scalastream + +import scalastream.App.{Config, Data} + +import scala.collection.immutable.ArraySeq +import scala.collection.parallel.CollectionConverters._ +import scala.reflect.ClassTag +class ParStream[@specialized(Float, Double) A: Fractional: ClassTag](val config: Config[A]) + extends ScalaStream[A]: + + private var a: Array[A] = _ + private var b: Array[A] = _ + private var c: Array[A] = _ + private val scalar: A = config.scalar + + inline private def indices = (0 until config.options.arraysize).par + + override inline def initArrays(): Unit = + a = Array.ofDim(config.options.arraysize) + b = Array.ofDim(config.options.arraysize) + c = Array.ofDim(config.options.arraysize) + + for i <- indices do + a(i) = config.init._1 + b(i) = config.init._2 + c(i) = config.init._3 + + override inline def copy(): Unit = for i <- indices do c(i) = a(i) + override inline def mul(): Unit = for i <- indices do b(i) = scalar * c(i) + override inline def add(): Unit = for i <- indices do c(i) = a(i) + b(i) + override inline def triad(): Unit = for i <- indices do a(i) = b(i) + scalar * c(i) + override inline def nstream(): Unit = for i <- indices do a(i) = b(i) * scalar * c(i) + override inline def dot(): A = + indices.aggregate[A](0.fractional)((acc, i) => acc + (a(i) * b(i)), _ + _) + + override inline def data(): Data[A] = Data(a.to(ArraySeq), b.to(ArraySeq), c.to(ArraySeq)) diff --git a/scala-stream/src/main/scala/scalastream/PlainStream.scala b/scala-stream/src/main/scala/scalastream/PlainStream.scala new file mode 100644 index 0000000..2b42571 --- /dev/null +++ b/scala-stream/src/main/scala/scalastream/PlainStream.scala @@ -0,0 +1,31 @@ +package scalastream + +import scalastream.App.{Config, Data} + +import scala.collection.immutable.ArraySeq +import scala.reflect.ClassTag +class PlainStream[@specialized(Float, Double) A: Fractional: ClassTag](val config: Config[A]) + extends ScalaStream[A]: + + private var a: Array[A] = _ + private var b: Array[A] = _ + private var c: Array[A] = _ + private val scalar: A = config.scalar + + override inline def initArrays(): Unit = + a = Array.fill(config.options.arraysize)(config.init._1) + b = Array.fill(config.options.arraysize)(config.init._2) + c = Array.fill(config.options.arraysize)(config.init._3) + + private inline def indices = 0 until config.options.arraysize + + override inline def copy(): Unit = for i <- indices do c(i) = a(i) + override inline def mul(): Unit = for i <- indices do b(i) = scalar * c(i) + override inline def add(): Unit = for i <- indices do c(i) = a(i) + b(i) + override inline def triad(): Unit = for i <- indices do a(i) = b(i) + (scalar * c(i)) + override inline def nstream(): Unit = for i <- indices do a(i) = b(i) * scalar * c(i) + override inline def dot(): A = + var acc: A = 0.fractional + for i <- indices do acc = acc + (a(i) * b(i)) + acc + override inline def data(): Data[A] = Data(a.to(ArraySeq), b.to(ArraySeq), c.to(ArraySeq)) diff --git a/scala-stream/src/main/scala/scalastream/ScalaStream.scala b/scala-stream/src/main/scala/scalastream/ScalaStream.scala new file mode 100644 index 0000000..4ed90e4 --- /dev/null +++ b/scala-stream/src/main/scala/scalastream/ScalaStream.scala @@ -0,0 +1,369 @@ +package scalastream +import scalastream.App.{Config, Data, Timings} + +import java.util.concurrent.TimeUnit +import scala.collection.immutable.ArraySeq +import scala.collection.mutable.ArrayBuffer +import scala.concurrent.duration.{Duration, FiniteDuration, SECONDS} +import scala.math.{Pi, pow} +import scala.reflect.ClassTag +import scopt.OParser + +transparent trait ScalaStream[@specialized(Float, Double) A]: + + def config: Config[A] + + def initArrays(): Unit + def copy(): Unit + def mul(): Unit + def add(): Unit + def triad(): Unit + def nstream(): Unit + def dot(): A + + transparent inline def timed[R](f: => R): (FiniteDuration, R) = + val start = System.nanoTime() + val r = f + val end = System.nanoTime() + FiniteDuration(end - start, TimeUnit.NANOSECONDS) -> r + + inline def runAll(times: Int)(using Fractional[A]): (Timings[Vector[FiniteDuration]], A) = + val copy = ArrayBuffer.fill[FiniteDuration](times)(Duration.Zero) + val mul = ArrayBuffer.fill[FiniteDuration](times)(Duration.Zero) + val add = ArrayBuffer.fill[FiniteDuration](times)(Duration.Zero) + val triad = ArrayBuffer.fill[FiniteDuration](times)(Duration.Zero) + val dot = ArrayBuffer.fill[FiniteDuration](times)(Duration.Zero) + + var lastSum: A = 0.fractional + + for i <- 0 until times do + copy(i) = timed(this.copy())._1 + mul(i) = timed(this.mul())._1 + add(i) = timed(this.add())._1 + triad(i) = timed(this.triad())._1 + val (dot_, sum) = timed(this.dot()) + dot(i) = dot_ + lastSum = sum + val s = lastSum + + ( + Timings( + copy = copy.toVector, + mul = mul.toVector, + add = add.toVector, + triad = triad.toVector, + dot = dot.toVector + ), + s + ) + + def runTriad(times: Int): FiniteDuration = timed(for _ <- 0 until times do triad())._1 + def runNStream(times: Int): Vector[FiniteDuration] = Vector.fill(times)(timed(nstream())._1) + + def data(): Data[A] + + +trait Fractional[@specialized(Double, Float) A]: + def toFractional(f: Float): A + def toFractional(f: Double): A + def compare(x: A, y: A): Int + def add(x: A, y: A): A + def sub(x: A, y: A): A + def mul(x: A, y: A): A + def div(x: A, y: A): A + def abs(x: A): A + extension (x: Float) inline def fractional = toFractional(x) + extension (x: Double) inline def fractional = toFractional(x) + extension (x: Int) inline def fractional = toFractional(x.toFloat) + extension (x: Long) inline def fractional = toFractional(x.toDouble) + extension (x: A) + inline def +(y: A) = add(x, y) + inline def -(y: A) = sub(x, y) + inline def *(y: A) = mul(x, y) + inline def /(y: A) = div(x, y) + inline def >(y: A) = compare(x, y) > 0 + inline def <(y: A) = compare(x, y) < 0 + inline def abs_ = abs(x) +end Fractional + +given FloatFractional: Fractional[Float] with + inline def toFractional(f: Float): Float = f + inline def toFractional(f: Double): Float = f.toFloat + inline def compare(x: Float, y: Float): Int = x.compare(y) + inline def add(x: Float, y: Float): Float = x + y + inline def sub(x: Float, y: Float): Float = x - y + inline def mul(x: Float, y: Float): Float = x * y + inline def div(x: Float, y: Float): Float = x / y + inline def abs(x: Float): Float = math.abs(x) + +given DoubleFractional: Fractional[Double] with + inline def toFractional(f: Float): Double = f.toDouble + inline def toFractional(f: Double): Double = f + inline def compare(x: Double, y: Double): Int = x.compare(y) + inline def add(x: Double, y: Double): Double = x + y + inline def sub(x: Double, y: Double): Double = x - y + inline def mul(x: Double, y: Double): Double = x * y + inline def div(x: Double, y: Double): Double = x / y + inline def abs(x: Double): Double = math.abs(x) + +object App: + + final val Version: String = "3.4.0" + + case class Config[@specialized(Double, Float) A]( + options: Options, + benchmark: Benchmark, + typeSize: Int, + ulp: A, + scalar: A, + init: (A, A, A) + ) + + case class Timings[A](copy: A, mul: A, add: A, triad: A, dot: A) + case class Data[A](@specialized(Double, Float) a: ArraySeq[A], b: ArraySeq[A], c: ArraySeq[A]) + + case class Options( + list: Boolean = false, + device: Int = 0, + numtimes: Int = 100, + arraysize: Int = 33554432, + float: Boolean = false, + triad_only: Boolean = false, + nstream_only: Boolean = false, + csv: Boolean = false, + mibibytes: Boolean = false + ) + + object Options: + val Default = Options() + val builder = OParser.builder[Options] + val parser1 = + import builder._ + OParser.sequence( + programName("scala-stream"), + head("ScalaStream", s"$Version"), + opt[Unit]('l', "list").text("List available devices").action((_, x) => x.copy(list = true)), + opt[Int]('d', "device") + .text(s"Select device at , defaults to ${Default.device}") + .action((v, x) => x.copy(device = v)), + opt[Int]('n', "numtimes") + .text(s"Run the test times (NUM >= 2), defaults to ${Default.numtimes}") + .validate { + case n if n >= 2 => success + case n => failure(s"$n <= 2") + } + .action((n, x) => x.copy(numtimes = n)), + opt[Int]('a', "arraysize") + .text(s"Use elements in the array, defaults to ${Default.arraysize}") + .action((v, x) => x.copy(arraysize = v)), + opt[Unit]('f', "float") + .text("Use floats (rather than doubles)") + .action((_, x) => x.copy(float = true)), + opt[Unit]('t', "triad_only") + .text("Only run triad") + .action((_, x) => x.copy(triad_only = true)), + opt[Unit]('n', "nstream_only") + .text("Only run nstream") + .action((_, x) => x.copy(nstream_only = true)), + opt[Unit]('c', "csv").text("Output as csv table").action((_, x) => x.copy(csv = true)), + opt[Unit]('m', "mibibytes") + .text("Use MiB=2^20 for bandwidth calculation (default MB=10^6)") + .action((_, x) => x.copy(mibibytes = true)), + help('h', "help").text("prints this usage text") + ) + + enum Benchmark: + case All, NStream, Triad + + implicit class RichDuration(private val d: Duration) extends AnyVal: + def seconds: Double = d.toUnit(SECONDS) + + def validate[A: Fractional](vec: Data[A], config: Config[A], dotSum: Option[A] = None): Unit = + + var (goldA, goldB, goldC) = config.init + for _ <- 0 until config.options.numtimes do + config.benchmark match + case Benchmark.All => + goldC = goldA + goldB = config.scalar * goldC + goldC = goldA + goldB + goldA = goldB + config.scalar * goldC + case Benchmark.Triad => + goldA = goldB + config.scalar * goldC + case Benchmark.NStream => + goldA += goldB + config.scalar * goldC + + val tolerance = config.ulp * (100.fractional) + def validateXs(name: String, xs: Seq[A], from: A): Unit = + val error = xs.map(x => (x - from).abs_).fold(0.fractional)(_ + _) / xs.size.fractional + if error > tolerance then + Console.err.println(s"Validation failed on $name. Average error $error ") + + validateXs("a", vec.a, goldA) + validateXs("b", vec.b, goldB) + validateXs("c", vec.c, goldC) + + dotSum.foreach { sum => + val goldSum = (goldA * goldB) * (config.options.arraysize).fractional + val error = ((sum - goldSum) / goldSum).abs_ + if error > 1.fractional / 100000000.fractional then + Console.err.println( + s"Validation failed on sum. Error $error \nSum was $sum but should be $goldSum" + ) + } + + inline def run[A: Fractional: ClassTag]( + name: String, + config: Config[A], + mkStream: Config[A] => ScalaStream[A] + ): Unit = + + val opt = config.options + + val arrayBytes = opt.arraysize * config.typeSize + val totalBytes = arrayBytes * 3 + val (megaScale, megaSuffix, gigaScale, gigaSuffix) = + if !opt.mibibytes then (1.0e-6, "MB", 1.0e-9, "GB") + else (pow(2.0, -20), "MiB", pow(2.0, -30), "GiB") + + if !opt.csv then + + val vendor = System.getProperty("java.vendor") + val ver = System.getProperty("java.version") + val home = System.getProperty("java.home") + println( + s"""BabelStream + |Version: $Version + |Implementation: $name; Scala (Java $ver; $vendor; home=$home)""".stripMargin + ) + + println(s"Running ${config.benchmark match { + case Benchmark.All => "kernels" + case Benchmark.Triad => "triad" + case Benchmark.NStream => "nstream" + }} ${opt.numtimes} times") + + if config.benchmark == Benchmark.Triad then println(s"Number of elements: ${opt.arraysize}") + + println(s"Precision: ${if opt.float then "float" else "double"}") + println( + f"Array size: ${megaScale * arrayBytes}%.1f $megaSuffix (=${gigaScale * arrayBytes}%.1f $gigaSuffix)" + ) + println( + f"Total size: ${megaScale * totalBytes}%.1f $megaSuffix (=${gigaScale * totalBytes}%.1f $gigaSuffix)" + ) + + def mkRow(xs: Vector[FiniteDuration], name: String, totalBytes: Int) = + val tail = xs.tail + (tail.minOption.map(_.seconds), tail.maxOption.map(_.seconds)) match + case (Some(min), Some(max)) => + val avg = (tail.foldLeft(Duration.Zero)(_ + _) / tail.size.toDouble).seconds + val mbps = megaScale * totalBytes.toDouble / min + if opt.csv then + Vector( + "function" -> name, + "num_times" -> opt.numtimes.toString, + "n_elements" -> opt.arraysize.toString, + "sizeof" -> totalBytes.toString, + s"max_m${if opt.mibibytes then "i" else ""}bytes_per_sec" -> mbps.toString, + "min_runtime" -> min.toString, + "max_runtime" -> max.toString, + "avg_runtime" -> avg.toString + ) + else + Vector( + "Function" -> name, + s"M${if opt.mibibytes then "i" else ""}Bytes/sec" -> f"$mbps%.3f", + "Min (sec)" -> f"$min%.5f", + "Max" -> f"$max%.5f", + "Average" -> f"$avg%.5f" + ) + case (_, _) => sys.error(s"No min/max element for $name(size=$totalBytes)") + + def tabulate(rows: Vector[(String, String)]*): Unit = rows.toList match + case Nil => sys.error(s"Empty tabulation") + case header :: _ => + val padding = if opt.csv then 0 else 12 + val sep = if opt.csv then "," else "" + println(header.map(_._1.padTo(padding, ' ')).mkString(sep)) + println(rows.map(_.map(_._2.padTo(padding, ' ')).mkString(sep)).mkString("\n")) + + val stream = mkStream(config) + stream.initArrays() + config.benchmark match + case Benchmark.All => + val (results, sum) = stream.runAll(opt.numtimes) + validate(stream.data(), config, Some(sum)) + tabulate( + mkRow(results.copy, "Copy", 2 * arrayBytes), + mkRow(results.mul, "Mul", 2 * arrayBytes), + mkRow(results.add, "Add", 3 * arrayBytes), + mkRow(results.triad, "Triad", 3 * arrayBytes), + mkRow(results.dot, "Dot", 2 * arrayBytes) + ) + case Benchmark.NStream => + val result = stream.runNStream(opt.numtimes) + validate(stream.data(), config) + tabulate(mkRow(result, "Nstream", 4 * arrayBytes)) + case Benchmark.Triad => + val results = stream.runTriad(opt.numtimes) + val totalBytes = 3 * arrayBytes * opt.numtimes + val bandwidth = megaScale * (totalBytes / results.seconds) + println(f"Runtime (seconds): ${results.seconds}%.5f") + println(f"Bandwidth ($gigaSuffix/s): $bandwidth%.3f ") + + inline def devices[A: Fractional: ClassTag]: Vector[(String, Config[A] => ScalaStream[A])] = + Vector( + "Scala Parallel Collections" -> (ParStream(_)), + "Java 8 Stream" -> (J8SStream(_)), + "Threaded" -> (ThreadStream(_)), + "Serial" -> (PlainStream(_)) + ) + + inline def runWith[A: Fractional: ClassTag](i: Int, config: Config[A]): Unit = + devices[A].lift(i) match + case None => println(s"Device index out of bounds: $i") + case Some((name, mkStream)) => run(name, config, mkStream) + + def main(args: Array[String]): Unit = + + def handleOpt(opt: Options) = + val benchmark = (opt.nstream_only, opt.triad_only) match + case (true, false) => Benchmark.NStream + case (false, true) => Benchmark.Triad + case (false, false) => Benchmark.All + case (true, true) => + throw new RuntimeException( + "Both triad and nstream are enabled, pick one or omit both to run all benchmarks" + ) + + if opt.list then + devices[Float].zipWithIndex.foreach { case ((name, _), i) => println(s"$i: $name") } + else if opt.float then + runWith( + opt.device, + Config( + options = opt, + benchmark = benchmark, + typeSize = 4, // 32bit + ulp = math.ulp(Float.MaxValue), + scalar = 0.4f, + init = (0.1f, 0.2f, 0.0f) + ) + ) + else + runWith( + opt.device, + Config( + options = opt, + benchmark = benchmark, + typeSize = 8, + ulp = math.ulp(Double.MaxValue), + scalar = 0.4, // 64bit + init = (0.1, 0.2, 0.0) + ) + ) + + OParser.parse(Options.parser1, args, Options.Default) match + case Some(config) => handleOpt(config) + case _ => sys.exit(1) diff --git a/scala-stream/src/main/scala/scalastream/ThreadStream.scala b/scala-stream/src/main/scala/scalastream/ThreadStream.scala new file mode 100644 index 0000000..969a71f --- /dev/null +++ b/scala-stream/src/main/scala/scalastream/ThreadStream.scala @@ -0,0 +1,68 @@ +package scalastream + +import net.openhft.affinity.{AffinityStrategies, AffinityThreadFactory} +import scalastream.App.{Config, Data} + +import java.util.concurrent.{Callable, Executors} +import scala.collection.immutable.ArraySeq +import scala.reflect.ClassTag +object ThreadStream {} +class ThreadStream[@specialized(Float, Double) A: Fractional: ClassTag](val config: Config[A]) + extends ScalaStream[A]: + + private var a: Array[A] = _ + private var b: Array[A] = _ + private var c: Array[A] = _ + private val scalar: A = config.scalar + + private val chunks: Int = sys.runtime.availableProcessors() + + private val pool = Executors.newFixedThreadPool( + chunks, + new AffinityThreadFactory("scala-stream", true, AffinityStrategies.DIFFERENT_CORE) + ) + + private val indices = (0 until config.options.arraysize) + .grouped(config.options.arraysize / chunks) + .toSeq + + private inline def forEachAll[C](c: => C)(f: (C, Int) => Unit): Seq[C] = + import scala.jdk.CollectionConverters._ + val xs = pool + .invokeAll( + indices.map { r => + { () => + val ctx = c + r.foreach(f(ctx, _)) + ctx + }: Callable[C] + }.asJavaCollection + ) + .asScala + .map(_.get()) + .toSeq + xs + + override inline def initArrays(): Unit = + a = Array.ofDim(config.options.arraysize) + b = Array.ofDim(config.options.arraysize) + c = Array.ofDim(config.options.arraysize) + forEachAll(()) { (_, i) => + a(i) = config.init._1 + b(i) = config.init._2 + c(i) = config.init._3 + } + () + + class Box(var value: A) + override inline def copy(): Unit = { forEachAll(())((_, i) => c(i) = a(i)); () } + override inline def mul(): Unit = { forEachAll(())((_, i) => b(i) = scalar * c(i)); () } + override inline def add(): Unit = { forEachAll(())((_, i) => c(i) = a(i) + b(i)); () } + override inline def triad(): Unit = { forEachAll(())((_, i) => a(i) = b(i) + scalar * c(i)); () } + override inline def nstream(): Unit = { forEachAll(())((_, i) => a(i) = b(i) * scalar * c(i)); () } + + override inline def dot(): A = + forEachAll(Box(0.fractional))((acc, i) => acc.value = acc.value + (a(i) * b(i))) + .map(_.value) + .fold(0.fractional)(_ + _) + override inline def data(): Data[A] = Data(a.to(ArraySeq), b.to(ArraySeq), c.to(ArraySeq))