Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions .github/workflows/scala.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
name: Build Spark sql perf

on:
push:
branches:
- master
pull_request:
branches:
- master

jobs:
build:
runs-on: ubuntu-22.04

steps:
- name: Checkout code
uses: actions/checkout@v2

- name: Set up JDK 11
uses: actions/setup-java@v3
with:
java-version: '11'
distribution: 'adopt'

- name: Cache sbt
uses: actions/cache@v4
with:
path: |
~/.ivy2/cache
~/.sbt
~/.coursier
key: ${{ runner.os }}-sbt-${{ hashFiles('**/build.sbt') }}
restore-keys: |
${{ runner.os }}-sbt-

- name: Build with sbt
run: ./build/sbt compile

- name: Package with sbt
run: ./build/sbt package

- name: Extract version
id: extract_version
run: |
version=$(cat version.sbt | grep 'version in ThisBuild :=' | awk -F'\"' '{print $2}')
echo "version=$version" >> $GITHUB_ENV

- name: Upload JAR artifact
uses: actions/upload-artifact@v4
with:
name: spark-sql-perf_2.12-${{ env.version }}.jar
path: target/scala-2.12/*.jar
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,5 @@ src_managed/
project/boot/
project/plugins/project/
performance/
/.bloop/
/build/*.zip
2 changes: 1 addition & 1 deletion bin/run
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
# runs spark-sql-perf from the current directory

ARGS="runBenchmark $@"
build/sbt "$ARGS"
sbt "$ARGS"
66 changes: 34 additions & 32 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -5,65 +5,64 @@ name := "spark-sql-perf"

organization := "com.databricks"

scalaVersion := "2.12.10"
scalaVersion := "2.12.18"

crossScalaVersions := Seq("2.12.10")
crossScalaVersions := Seq("2.12.18")

sparkPackageName := "databricks/spark-sql-perf"
// Remove publishing configuration for now - focus on compilation
// sparkPackageName := "databricks/spark-sql-perf"

// All Spark Packages need a license
licenses := Seq("Apache-2.0" -> url("http://opensource.org/licenses/Apache-2.0"))

sparkVersion := "3.0.0"
// Spark version - define it manually since we removed the spark-packages plugin
val sparkVersion = "3.5.1"

sparkComponents ++= Seq("sql", "hive", "mllib")
// Add Spark dependencies manually
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion % "provided",
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
"org.apache.spark" %% "spark-hive" % sparkVersion % "provided",
"org.apache.spark" %% "spark-mllib" % sparkVersion % "provided"
)


initialCommands in console :=
initialCommands / console :=
"""
|import org.apache.spark.sql._
|import org.apache.spark.sql.functions._
|import org.apache.spark.sql.types._
|import org.apache.spark.sql.hive.test.TestHive
|import TestHive.implicits
|import TestHive.sql
|import org.apache.spark.sql.SparkSession
|
|val sqlContext = TestHive
|val spark = SparkSession.builder().appName("spark-sql-perf").getOrCreate()
|val sqlContext = spark.sqlContext
|import sqlContext.implicits._
""".stripMargin

libraryDependencies += "com.github.scopt" %% "scopt" % "3.7.1"
libraryDependencies += "com.github.scopt" %% "scopt" % "4.1.0"

libraryDependencies += "com.twitter" %% "util-jvm" % "6.45.0" % "provided"
libraryDependencies += "com.twitter" %% "util-jvm" % "24.2.0" % "provided"

libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.5" % "test"
libraryDependencies += "org.scalatest" %% "scalatest" % "3.2.19" % "test"

libraryDependencies += "org.yaml" % "snakeyaml" % "1.23"
libraryDependencies += "org.yaml" % "snakeyaml" % "2.5"

fork := true

// Your username to login to Databricks Cloud
dbcUsername := sys.env.getOrElse("DBC_USERNAME", "")

// Your password (Can be set as an environment variable)
dbcPassword := sys.env.getOrElse("DBC_PASSWORD", "")

// The URL to the Databricks Cloud DB Api. Don't forget to set the port number to 34563!
dbcApiUrl := sys.env.getOrElse ("DBC_URL", sys.error("Please set DBC_URL"))

// Add any clusters that you would like to deploy your work to. e.g. "My Cluster"
// or run dbcExecuteCommand
dbcClusters += sys.env.getOrElse("DBC_USERNAME", "")

dbcLibraryPath := s"/Users/${sys.env.getOrElse("DBC_USERNAME", "")}/lib"
// Remove Databricks Cloud configuration for now
// dbcUsername := sys.env.getOrElse("DBC_USERNAME", "")
// dbcPassword := sys.env.getOrElse("DBC_PASSWORD", "")
// dbcApiUrl := sys.env.getOrElse ("DBC_URL", sys.error("Please set DBC_URL"))
// dbcClusters += sys.env.getOrElse("DBC_USERNAME", "")
// dbcLibraryPath := s"/Users/${sys.env.getOrElse("DBC_USERNAME", "")}/lib"

val runBenchmark = inputKey[Unit]("runs a benchmark")

runBenchmark := {
import complete.DefaultParsers._
val args = spaceDelimited("[args]").parsed
val scalaRun = (runner in run).value
val classpath = (fullClasspath in Compile).value
val scalaRun = (Compile / run / runner).value
val classpath = (Compile / fullClasspath).value
scalaRun.run("com.databricks.spark.sql.perf.RunBenchmark", classpath.map(_.data), args,
streams.value.log)
}
Expand All @@ -74,13 +73,15 @@ val runMLBenchmark = inputKey[Unit]("runs an ML benchmark")
runMLBenchmark := {
import complete.DefaultParsers._
val args = spaceDelimited("[args]").parsed
val scalaRun = (runner in run).value
val classpath = (fullClasspath in Compile).value
val scalaRun = (Compile / run / runner).value
val classpath = (Compile / fullClasspath).value
scalaRun.run("com.databricks.spark.sql.perf.mllib.MLLib", classpath.map(_.data), args,
streams.value.log)
}


// Comment out release configuration for now
/*
import ReleaseTransformations._

/** Push to the team directory instead of the user's homedir for releases. */
Expand Down Expand Up @@ -159,3 +160,4 @@ releaseProcess := Seq[ReleaseStep](
commitNextVersion,
pushChanges
)
*/
2 changes: 1 addition & 1 deletion build/sbt
Original file line number Diff line number Diff line change
Expand Up @@ -153,4 +153,4 @@ trap onExit INT
run "$@"

exit_status=$?
onExit
onExit
17 changes: 9 additions & 8 deletions build/sbt-launch-lib.bash
Original file line number Diff line number Diff line change
Expand Up @@ -45,23 +45,24 @@ dlog () {

acquire_sbt_jar () {
SBT_VERSION=`awk -F "=" '/sbt\.version/ {print $2}' ./project/build.properties`
URL1=https://dl.bintray.com/typesafe/ivy-releases/org.scala-sbt/sbt-launch/${SBT_VERSION}/sbt-launch.jar
URL1=https://github.com/sbt/sbt/releases/download/v${SBT_VERSION}/sbt-${SBT_VERSION}.zip
JAR=build/sbt-launch-${SBT_VERSION}.jar

sbt_jar=$JAR

if [[ ! -f "$sbt_jar" ]]; then
# Download sbt launch jar if it hasn't been downloaded yet
if [ ! -f "${JAR}" ]; then
# Download
printf "Attempting to fetch sbt\n"
JAR_DL="${JAR}.part"
COMPLETE_SBT="build/sbt.zip"
if [ $(command -v curl) ]; then
curl --fail --location --silent ${URL1} > "${JAR_DL}" &&\
mv "${JAR_DL}" "${JAR}"
curl --fail --location --silent ${URL1} > "${COMPLETE_SBT}" &&\
unzip ${COMPLETE_SBT} &&\
cp "sbt/bin/sbt-launch.jar" "${JAR}"
elif [ $(command -v wget) ]; then
wget --quiet ${URL1} -O "${JAR_DL}" &&\
mv "${JAR_DL}" "${JAR}"
wget --quiet ${URL1} -O "${COMPLETE_SBT}" &&\
unzip ${COMPLETE_SBT} &&\
cp "sbt/bin/sbt-launch.jar" "${JAR}"
else
printf "You do not have curl or wget installed, please install sbt manually from http://www.scala-sbt.org/\n"
exit -1
Expand Down Expand Up @@ -195,4 +196,4 @@ run() {
-jar "$sbt_jar" \
"${sbt_commands[@]}" \
"${residual_args[@]}"
}
}
3 changes: 1 addition & 2 deletions project/build.properties
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
// This file should only contain the version of sbt to use.
sbt.version=0.13.18
sbt.version=1.10.6
20 changes: 11 additions & 9 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
// You may use this file to add plugin dependencies for sbt.

resolvers += "Spark Packages repo" at "https://repos.spark-packages.org/"
resolvers ++= Seq(
"sonatype-releases" at "https://oss.sonatype.org/content/repositories/releases/",
"Spark Packages Repo" at "https://repos.spark-packages.org/"
)

resolvers += "sonatype-releases" at "https://oss.sonatype.org/content/repositories/releases/"
// Remove incompatible plugins for now
// addSbtPlugin("org.spark-packages" % "sbt-spark-package" % "0.2.3")

addSbtPlugin("org.spark-packages" %% "sbt-spark-package" % "0.1.1")
// addSbtPlugin("com.github.mpeltonen" % "sbt-idea" % "1.6.0")

addSbtPlugin("com.github.mpeltonen" % "sbt-idea" % "1.6.0")
// addSbtPlugin("com.github.sbt" % "sbt-release" % "1.0.15")

addSbtPlugin("com.github.gseitz" % "sbt-release" % "1.0.0")
// addSbtPlugin("com.databricks" %% "sbt-databricks" % "0.1.5")

addSbtPlugin("com.databricks" %% "sbt-databricks" % "0.1.3")
// addSbtPlugin("org.foundweekends" % "sbt-bintray" % "0.5.6")

addSbtPlugin("me.lessis" % "bintray-sbt" % "0.3.0")

addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.0.0")
// addSbtPlugin("com.github.sbt" % "sbt-pgp" % "2.1.2")
5 changes: 3 additions & 2 deletions src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ abstract class Benchmark(
new SparkPerfExecution(
name,
Map.empty,
() => Unit,
() => (),
() => rdd.count(),
rdd.toDebugString)
}
Expand Down Expand Up @@ -240,7 +240,8 @@ abstract class Benchmark(
protected override def doBenchmark(
includeBreakdown: Boolean,
description: String = "",
messages: ArrayBuffer[String]): BenchmarkResult = {
messages: ArrayBuffer[String],
iteration: Int = 1): BenchmarkResult = {
try {
val timeMs = measureTimeMs(run())
BenchmarkResult(
Expand Down
10 changes: 6 additions & 4 deletions src/main/scala/com/databricks/spark/sql/perf/Benchmarkable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,15 @@ trait Benchmarkable {
description: String = "",
messages: ArrayBuffer[String],
timeout: Long,
forkThread: Boolean = true): BenchmarkResult = {
forkThread: Boolean = true,
iteration: Int = 1): BenchmarkResult = {
logger.info(s"$this: benchmark")
sparkContext.setJobDescription(s"Execution: $name, $description")
beforeBenchmark()
val result = if (forkThread) {
runBenchmarkForked(includeBreakdown, description, messages, timeout)
} else {
doBenchmark(includeBreakdown, description, messages)
doBenchmark(includeBreakdown, description, messages, iteration)
}
afterBenchmark(sqlContext.sparkContext)
result
Expand Down Expand Up @@ -85,7 +86,7 @@ trait Benchmarkable {
mode = executionMode.toString,
parameters = Map.empty,
failure = Some(Failure(e.getClass.getSimpleName,
e.getMessage + ":\n" + e.getStackTraceString)))
e.getMessage + ":\n" + e.getStackTrace.mkString("\n"))))
}
}
}
Expand All @@ -107,7 +108,8 @@ trait Benchmarkable {
protected def doBenchmark(
includeBreakdown: Boolean,
description: String = "",
messages: ArrayBuffer[String]): BenchmarkResult
messages: ArrayBuffer[String],
iteration: Int = 1): BenchmarkResult

protected def measureTimeMs[A](f: => A): Double = {
val startTime = System.nanoTime()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ class DatasetPerformance extends Benchmark {
new SparkPerfExecution(
"RDD: average",
Map.empty,
prepare = () => Unit,
prepare = () => (),
run = () => {
val sumAndCount =
smallrdd.map(i => (i, 1)).reduce((a, b) => (a._1 + b._1, a._2 + b._2))
Expand Down
5 changes: 3 additions & 2 deletions src/main/scala/com/databricks/spark/sql/perf/Query.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ class Query(
protected override def doBenchmark(
includeBreakdown: Boolean,
description: String = "",
messages: ArrayBuffer[String]): BenchmarkResult = {
messages: ArrayBuffer[String],
iteration: Int = 1): BenchmarkResult = {
try {
val dataFrame = buildDataFrame
val queryExecution = dataFrame.queryExecution
Expand Down Expand Up @@ -92,7 +93,7 @@ class Query(
messages += s"Breakdown: ${node.simpleString(maxFields)}"
val newNode = buildDataFrame.queryExecution.executedPlan.p(index)
val executionTime = measureTimeMs {
newNode.execute().foreach((row: Any) => Unit)
newNode.execute().foreach((row: Any) => ())
}
timeMap += ((index, executionTime))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ class MLPipelineStageBenchmarkable(
override protected def doBenchmark(
includeBreakdown: Boolean,
description: String,
messages: ArrayBuffer[String]): BenchmarkResult = {
messages: ArrayBuffer[String],
iteration: Int = 1): BenchmarkResult = {
try {
val (trainingTime, model: Transformer) = measureTime {
logger.info(s"$this: train: trainingSet=${trainingData.schema}")
Expand Down
2 changes: 1 addition & 1 deletion version.sbt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version in ThisBuild := "0.5.1-SNAPSHOT"
ThisBuild / version := "0.5.2-SNAPSHOT"