Discussed in this article will be:
- SBT setup
- Implementing a Spark trait that reads commandline arguments properly and creates a spark session
- Testing spark jobs and functions
- CI/CD
Why?
More often than not I notice companies and employees struggling to find a good spark application structure. Code is filled with side-effects, such as mixing I/O with logic, making the spark logic impossible to unit test. They are not using a proper structure that separates concerns destroying all modularity, increasing technical debt.
Even though those principles are hard in general, using a computation framework such as spark does not make it easier since each job usually does input -> transform -> output
.
This post proposes a structure and some best practices that try to address these issues.
Getting started
To get started and have a look at the project structure, clone the repository at github
SBT setup
The first few lines in build.sbt
are the most important lines to bootstrap your application. They contain the project name and the spark dependencies.
These lines define the name
, version
and organization
of your project and are needed to upload a succesfull build to a binary store, more on that later. We use scalaVersion
2.11.11
since that is the version spark
is compiled against and as of writing the latest available spark version is 2.2.0
.
The dependencies make sure that spark
is available on the classpath for compilation, however the scope is Provided
as we assume that wherever we deploy our application a spark cluster is already running.
Scopt
is a very useful library to help reading arguments from commandline, we’ll pull in that package on the default scope Compile
. Last but very important scalatest
is pulled in to help write proper unit test and integration tests. Since this library is only needed for testing the scope is limited to test
and it
. The latter must be enabled before the it
command is recognized and is done so by the 2 lines:
Defaults.itSettings lazy val root = project.in(file(".")).configs(IntegrationTest)
This enables the command
$ sbt it:test
to run all integration tests under folder src/it/
The lines
Test / testOptions += Tests.Argument("-oD") IntegrationTest / testOptions += Tests.Argument("-oD")
are helpful utility lines that enabled time measurement for each test such that sbt
can print those in the testreport.
Assembly
Since we’re deploying the application on some cluster all dependencies that are not available on the classpath of the cluster must be packed into the jar. For that the sbt-assembly
plugin is used. The project/assembly.sbt
file consists of a single line:
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.4")
which enables the bash command.
$ sbt assembly
This command will run all your tests and packages all Compile
scope libraries into the jar, creating a FAT jar.
For more information, refer to the github page for sbt-assembly
Spark jobs
The main spark trait
is src/main/scala/thw/vancann/SparkJob.scala
. It essentially does 2 things:
- Read in and parse any optional and required command line arguments into a
case class
- Start a
SparkSession
, initialize aStorage
object and call therun
function.
The only thing actual jobs need to do is implement the functions appName
and run
.
Scopt
src/main/scala/thw/vancann/UsageConfig.scala
is the file containing the UsageConfig
case class and the parser that parses the command line arguments. For more information about scopt, please refer to their github page.
Storage
The src/main/scala/thw/vancann/storage/Storage.scala
should define all I/O of your application. There should be no hidden reads and writes, everything should go through one place such that consolidation can be achieved. Note that for this structure we will always assume the use of Dataset
s typed as proper case classes, as one should.
Storage
is defined as a trait, this makes it trivial to hook in a different storage implementation for separate use cases or when switching cloud providers, making the code slightly more modular.
Example Job
An actual example sparkjob is provided in src/main/scala/thw/vancann/WordCount.scala
. Note that this job seperates I/O from actual logic.
The run
function reads in all sources needed for this job, this function should have NO LOGIC besides I/O. The magic happens in the transform
function, inspired by spark Transformers. Taking one or multiple Dataset
s plus auxiliary arguments and should transform the data as needed. Returning the resulting Dataset
.
Anything equal and lower than the transform function SHOULD NOT do any I/O or have side effects in general. This makes it trivial to test all functions without having to rely on external data sources or stubbing databases. Yes, this requires discipline but it’ll pay off!
Each spark job should only write to one destination and generally do only one thing, i.e. transform and/or combine some data, pull from an API, create a model (using SparkML or H2o, apply a model or do some ingestion. It is fine – and usually necessarily – to read from multiple sources.
It is better to have more jobs doing smaller things than a big job doing lots of things! Smaller jobs are easier to test, easier to modify and certainly easier to debug.
Testing
For testing one usually needs to have a spark session available for each test. To reuse the same session, and to not start a new one for each test class, a SharedSparkSession
in src/test/scala/thw/vancann/SharedSparkSession.scala
is provided. By importing this singleton into each test the same session is reused throughout tests and suites.
An example for such a test is provided in src/test/scala/thw/vancann/WordCountTest.scala
using FlatSpec
from scalatest as teststyle and test library respectively.
Any I/O tests generally go in the src/it
folder to separate the unit tests from the integration tests. An integration test that reads a file is provided in src/it/scala/thw/vancann/WordCountTest.scala
. Note again the usefulness of the Storage
trait as we can easily implement a LocalStorage
to read from the resources
folder, removing the need for mocking / stubbing.
Continuous Integration (CI) and Continuous Delivery (CD)
To make sure each push to a branch does not break any code or styleguides Continuous Ingegration (CI) is a good way to catch each culprit and adhere to agreements made by your team.
Provided is a way to do CI in gitlab, which allows you to run CI and CD for free as long as you configure your own runner which is quite easy and very well documented at https://docs.gitlab.com/ee/ci/runners/README.html.
The file .gitlab-ci.yml
describes how to run three stages in a pristine docker container.
(1) Tests and codecoverage
Run both the unit tests and integration tests and measure the code coverage using scoverage which is added as plugin in project/plugins.sbt
.
coverageExcludedPackages := ";.*storage.*" coverageMinimum := 70 coverageFailOnMinimum := true
in build.sbt
define some settings for scoverage. These particular settings set a coverage level of minimum 70
and let the build fail if coveragereport finds out that the coverage falls below this number.
(2) Scalastyle
Check for predefined style violations defined in scalastyle-config.xml
. For more information have a look at scalastyle.
The lines
scalastyleFailOnWarning := false scalastyleFailOnError := true
in build.sbt
define if and when scalastyle should fail the build.
(3) Generate scaladocs and publish the project
For this last step a few additional parameters need to be uncommented in build.sbt
. Have a look at the sbt documentation to see what these lines do.
Note that starting each docker container for each stage is quite slow as all sbt
dependencies need to be pulled in. A slightly more advanced and significantly faster solution is to prepack a docker image that contains any dependencies defined in build.sbt
in the docker image .ivy2
repository folder.
Final notes
There you have it. A project to bootstrap your spark application! Feel free to tweak, add and remove as you please, but do take note of the best practices proposed:
- Separate concerns
- No side-effects
- A spark job should generally have only one output, restricting the job to do only one thing.
- Think modular!
Want to expand your Apache Spark knowledge?
Join us for the three-day training course Data Science with Spark. The course teaches you everything you need to know to use Apache Spark to perform data science at scale with ease!