Discussed in this article will be:
- SBT setup
- Implementing a Spark trait that reads commandline arguments properly and creates a spark session
- Testing spark jobs and functions
More often than not I notice companies and employees struggling to find a good spark application structure. Code is filled with side-effects, such as mixing I/O with logic, making the spark logic impossible to unit test. They are not using a proper structure that separates concerns destroying all modularity, increasing technical debt.
Even though those principles are hard in general, using a computation framework such as spark does not make it easier since each job usually does
input -> transform -> output.
This post proposes a structure and some best practices that try to address these issues.
To get started and have a look at the project structure, clone the repository at github
The first few lines in
build.sbt are the most important lines to bootstrap your application. They contain the project name and the spark dependencies.
These lines define the
organization of your project and are needed to upload a succesfull build to a binary store, more on that later. We use
2.11.11 since that is the version
spark is compiled against and as of writing the latest available spark version is
The dependencies make sure that
spark is available on the classpath for compilation, however the scope is
Provided as we assume that wherever we deploy our application a spark cluster is already running.
Scopt is a very useful library to help reading arguments from commandline, we'll pull in that package on the default scope
Compile. Last but very important
scalatest is pulled in to help write proper unit test and integration tests. Since this library is only needed for testing the scope is limited to
it. The latter must be enabled before the
it command is recognized and is done so by the 2 lines:
Defaults.itSettings lazy val root = project.in(file(".")).configs(IntegrationTest)
This enables the command
$ sbt it:test
to run all integration tests under folder
Test / testOptions += Tests.Argument("-oD") IntegrationTest / testOptions += Tests.Argument("-oD")
are helpful utility lines that enabled time measurement for each test such that
sbt can print those in the testreport.
Since we're deploying the application on some cluster all dependencies that are not available on the classpath of the cluster must be packed into the jar. For that the
sbt-assembly plugin is used. The
project/assembly.sbt file consists of a single line:
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.4")
which enables the bash command.
$ sbt assembly
This command will run all your tests and packages all
Compile scope libraries into the jar, creating a FAT jar.
For more information, refer to the github page for sbt-assembly
The main spark
src/main/scala/thw/vancann/SparkJob.scala. It essentially does 2 things:
- Read in and parse any optional and required command line arguments into a
- Start a
SparkSession, initialize a
Storageobject and call the
The only thing actual jobs need to do is implement the functions
src/main/scala/thw/vancann/UsageConfig.scala is the file containing the
UsageConfig case class and the parser that parses the command line arguments. For more information about scopt, please refer to their github page.
src/main/scala/thw/vancann/storage/Storage.scala should define all I/O of your application. There should be no hidden reads and writes, everything should go through one place such that consolidation can be achieved. Note that for this structure we will always assume the use of
Datasets typed as proper case classes, as one should.
Storage is defined as a trait, this makes it trivial to hook in a different storage implementation for separate use cases or when switching cloud providers, making the code slightly more modular.
An actual example sparkjob is provided in
src/main/scala/thw/vancann/WordCount.scala. Note that this job seperates I/O from actual logic.
run function reads in all sources needed for this job, this function should have NO LOGIC besides I/O. The magic happens in the
transform function, inspired by spark Transformers. Taking one or multiple
Datasets plus auxiliary arguments and should transform the data as needed. Returning the resulting
Anything equal and lower than the transform function SHOULD NOT do any I/O or have side effects in general. This makes it trivial to test all functions without having to rely on external data sources or stubbing databases. Yes, this requires discipline but it'll pay off!
Each spark job should only write to one destination and generally do only one thing, i.e. transform and/or combine some data, pull from an API, create a model (using SparkML or H2o, apply a model or do some ingestion. It is fine - and usually necessarily - to read from multiple sources.
It is better to have more jobs doing smaller things than a big job doing lots of things! Smaller jobs are easier to test, easier to modify and certainly easier to debug.
For testing one usually needs to have a spark session available for each test. To reuse the same session, and to not start a new one for each test class, a
src/test/scala/thw/vancann/SharedSparkSession.scala is provided. By importing this singleton into each test the same session is reused throughout tests and suites.
An example for such a test is provided in
FlatSpec from scalatest as teststyle and test library respectively.
Any I/O tests generally go in the
src/it folder to separate the unit tests from the integration tests. An integration test that reads a file is provided in
src/it/scala/thw/vancann/WordCountTest.scala. Note again the usefulness of the
Storage trait as we can easily implement a
LocalStorage to read from the
resources folder, removing the need for mocking / stubbing.
Continuous Integration (CI) and Continuous Delivery (CD)
To make sure each push to a branch does not break any code or styleguides Continuous Ingegration (CI) is a good way to catch each culprit and adhere to agreements made by your team.
Provided is a way to do CI in gitlab, which allows you to run CI and CD for free as long as you configure your own runner which is quite easy and very well documented here.
.gitlab-ci.yml describes how to run three stages in a pristine docker container.
(1) Tests and codecoverage
Run both the unit tests and integration tests and measure the code coverage using scoverage which is added as plugin in
coverageExcludedPackages := ";.*storage.*" coverageMinimum := 70 coverageFailOnMinimum := true
build.sbt define some settings for scoverage. These particular settings set a coverage level of minimum
70 and let the build fail if coveragereport finds out that the coverage falls below this number.
Check for predefined style violations defined in
scalastyle-config.xml. For more information have a look at scalastyle.
scalastyleFailOnWarning := false scalastyleFailOnError := true
build.sbt define if and when scalastyle should fail the build.
(3) Generate scaladocs and publish the project
For this last step a few additional parameters need to be uncommented in
build.sbt. Have a look at the sbt documentation to see what these lines do.
Note that starting each docker container for each stage is quite slow as all
sbt dependencies need to be pulled in. A slightly more advanced and significantly faster solution is to prepack a docker image that contains any dependencies defined in
build.sbt in the docker image
.ivy2 repository folder.
There you have it. A project to bootstrap your spark application! Feel free to tweak, add and remove as you please, but do take note of the best practices proposed:
- Separate concerns
- No side-effects
- A spark job should generally have only one output, restricting the job to do only one thing.
- Think modular!
Want to expand your Apache Spark knowledge?
Join us for the three-day training course Data Science with Spark. The course teaches you everything you need to know to use Apache Spark to perform data science at scale with ease!