Blog

SparkR just got better

09 Sep, 2015
Xebia Background Header Wave

In this document I’ll give a quick review of some exiting new features for SparkR from the new 1.5 version that had it’s official release on September 9th 2015.

Rstudio provisioned

<img alt="" src="RStudio Logo Blue Gradient" />

Starting a full start SparkR service on AWS just got a whole lot easier. It takes about 15 minutes and only requires you to do three simple things.

Step 1: Command line

If you have your Amazon .pem files in order on your machine as well as your AWS credentials in your .bash_profile you can use the command line app in the spark-ec2 folder to start up a cluster on Amazon.

From this folder, run this command;

./spark-ec2 
  --key-pair=pems 
  --identity-file=/path/pems.pem 
  --region=eu-west-1 
  -s 4 
  --instance-type c3.xlarge 
  --copy-aws-credentials
  launch my-spark-cluster

This example will start a cluster with 4 slave nodes of type c3.xlarge in the eu-west-1 region. The name of the cluster is ‘my-spark-cluster’. If you want a bigger cluster, you can set the preferences here.

Step 2: Ssh + set password

The same command line app allows you to ssh into the master machine of the spark cluster.

./spark-ec2 -k pems-df -i /path/pems.pem --region=eu-west-1 login my-spark-cluster

Once you are logged in, you’ll be logged in as the root user. For security reasons it is more preferable to have a seperate user for Rstudio. This user has already been added to the system, the only thing you need to do is assign a password to it.

passwd rstudio

Step 3: Login

First, just check where the master server is located.

> curl icanhazip.com

Then fill in the link in your favorite browser and log in with the ‘Rstudio’ user and the password that you’ve set.

You can use the startSpark.R script to quickly get started. It creates a sparkContext and a sqlContext which you can then use to create distributed dataframes on your cluster.

When you are done

If you are done with the cluster and saved all your files back to s3, you don’t need to pay for the ec2 machines anymore. The cluster can then safely be destroyed via the same command line app as before.

./spark-ec2 -k pems-df -i /path/pems.pem --region=eu-west-1 destroy my-spark-cluster

Linear Models

Regression Example

ddf  createDataFrame(sqlContext, ChickWeight)
ddf %>%
  summary %>%
  collect
Result
  summary             weight               Time              Chick
1   count                578                578                578
2    mean 121.81833910034602 10.717993079584774 25.750865051903116
3  stddev  71.01045205007217  6.752550828025898   14.5561866041844
4     min               35.0                0.0                  1
5     max              373.0               21.0                  9
                Diet
1                578
2  2.235294117647059
3 1.1616716269489116
4                  1
5                  4

In this new SparkR shell you will notice that the glm method now comes from multiple packages.

?glm
Help on topic ‘glm’ was found in the following packages:

  Package               Library
  SparkR                /Users/code/Downloads/spark-1.5.0-bin-hadoop2.6/R/lib
  stats                 /Library/Frameworks/R.framework/Versions/3.2/Resources/library

You can run the model and save its parameters in a variable, just like in normal R.

dist_mod  glm(weight ~ Time + Diet, data = ddf, family = "gaussian")

To view the characteristics of the regression you’ll only need to run it through the summary function (a pattern common for many things in R).

dist_mod %>% summary
Result
$coefficients
              Estimate
(Intercept)  41.157845
Time          8.750492
Diet__1     -30.233456
Diet__2     -14.067382
Diet__3       6.265952

This model can be applied to new data to create predictions, very much in the same style as you would in normal R models/dataframes.

dist_mod %>%
  predict(newData = ddf) %>%
  showDF
Result
+------+----+-----+----+------------------+-----+------------------+
|weight|Time|Chick|Diet|          features|label|        prediction|
+------+----+-----+----+------------------+-----+------------------+
|  42.0| 0.0|    1|   1|     (4,[1],[1.0])| 42.0|10.924388954283739|
|  51.0| 2.0|    1|   1| [2.0,1.0,0.0,0.0]| 51.0| 28.42537280265673|
|  59.0| 4.0|    1|   1| [4.0,1.0,0.0,0.0]| 59.0| 45.92635665102972|
|  64.0| 6.0|    1|   1| [6.0,1.0,0.0,0.0]| 64.0| 63.42734049940272|
|  76.0| 8.0|    1|   1| [8.0,1.0,0.0,0.0]| 76.0| 80.92832434777571|
|  93.0|10.0|    1|   1|[10.0,1.0,0.0,0.0]| 93.0| 98.42930819614871|
| 106.0|12.0|    1|   1|[12.0,1.0,0.0,0.0]|106.0|115.93029204452169|
| 125.0|14.0|    1|   1|[14.0,1.0,0.0,0.0]|125.0| 133.4312758928947|
| 149.0|16.0|    1|   1|[16.0,1.0,0.0,0.0]|149.0|150.93225974126767|
| 171.0|18.0|    1|   1|[18.0,1.0,0.0,0.0]|171.0|168.43324358964065|
| 199.0|20.0|    1|   1|[20.0,1.0,0.0,0.0]|199.0|185.93422743801366|
| 205.0|21.0|    1|   1|[21.0,1.0,0.0,0.0]|205.0|194.68471936220016|
|  40.0| 0.0|    2|   1|     (4,[1],[1.0])| 40.0|10.924388954283739|
|  49.0| 2.0|    2|   1| [2.0,1.0,0.0,0.0]| 49.0| 28.42537280265673|
|  58.0| 4.0|    2|   1| [4.0,1.0,0.0,0.0]| 58.0| 45.92635665102972|
|  72.0| 6.0|    2|   1| [6.0,1.0,0.0,0.0]| 72.0| 63.42734049940272|
|  84.0| 8.0|    2|   1| [8.0,1.0,0.0,0.0]| 84.0| 80.92832434777571|
| 103.0|10.0|    2|   1|[10.0,1.0,0.0,0.0]|103.0| 98.42930819614871|
| 122.0|12.0|    2|   1|[12.0,1.0,0.0,0.0]|122.0|115.93029204452169|
| 138.0|14.0|    2|   1|[14.0,1.0,0.0,0.0]|138.0| 133.4312758928947|
+------+----+-----+----+------------------+-----+------------------+

A small GOTYA

You could compare the SparkR model results from the ChickWeight regression with the results from the normal R lm method. What you see might scare you a bit.

df  ChickWeight
loc_mod  glm(weight ~ Time + Diet, data = df)
loc_mod %>% summary
Results
Coefficients:
            Estimate Std. Error t value Pr(>|t|)
(Intercept)  10.9244     3.3607   3.251  0.00122 **
Time          8.7505     0.2218  39.451   2e-16 ***
Diet2        16.1661     4.0858   3.957 8.56e-05 ***
Diet3        36.4994     4.0858   8.933   2e-16 ***
Diet4        30.2335     4.1075   7.361 6.39e-13 ***

Compare this output with the distributed glm output. You might be frightened slightly at this point. The two models give very different output!

Don’t be scared just yet. The way that Spark has implemented machine learning is different, however it is still doing proper regression.

The main difference is that R-glm translates the Diet1 variable to be the constant intercept whereas the SparkR-glm translates Diet4. The only difference therefore is a linear transformation of the model, the prediction outcomes should still be the same. Another way to confirm this is to notice that the difference between Diet2 and Diet3 is the same in both models and the parameter for Time is also the same.

loc_mod %>% predict(df) %>% head(20)
Result
        1         2         3         4         5         6         7         8
 10.92439  28.42537  45.92636  63.42734  80.92833  98.42931 115.93029 133.43128
        9        10        11        12        13        14        15        16
150.93226 168.43324 185.93423 194.68472  10.92439  28.42537  45.92636  63.42734
       17        18        19        20
 80.92833  98.42931 115.93029 133.43128

Classification Example

By turning the family parameter from "gaussian" to "binomial", we turn the linear regression into a logistic regression.

df  data.frame(a = c(1,1,1,0,0,0), b = c(6,7,8,1,2,3))
ddf  createDataFrame(sqlContext, df)
mod  glm(a ~ b, data=ddf, family="binomial")
mod %>%
  predict(newData = ddf) %>%
  showDF
Result
+---+---+--------+-----+--------------------+--------------------+----------+
|  a|  b|features|label|       rawPrediction|         probability|prediction|
+---+---+--------+-----+--------------------+--------------------+----------+
|1.0|6.0|   [6.0]|  1.0|[-16.405749578003...|[7.50021053099107...|       1.0|
|1.0|7.0|   [7.0]|  1.0|[-27.201563029171...|[1.53642468901809...|       1.0|
|1.0|8.0|   [8.0]|  1.0|[-37.997376480340...|[3.14737918119432...|       1.0|
|0.0|1.0|   [1.0]|  0.0|[37.5733176778398...|[1.0,4.8096720602...|       0.0|
|0.0|2.0|   [2.0]|  0.0|[26.7775042266712...|[0.99999999999765...|       0.0|
|0.0|3.0|   [3.0]|  0.0|[15.9816907755026...|[0.99999988538542...|       0.0|
+---+---+--------+-----+--------------------+--------------------+----------+

The logistic version of glm unfortunately doesn’t give us a nice summary output at the moment. This is a known missing feature and should become available as of Spark 1.6. Another problem is that SparkR currently does not seem to support strings; which means that all classification tasks need to be cast to integers manually.

names(iris)  c("sepal_length","sepal_width","petal_length","petal_width","species")

ddf  sqlContext %>%
  createDataFrame(iris) %>%
  withColumn("to_pred", .$species == "setosa")

glm(to_pred ~ sepal_length + sepal_width + petal_length + petal_width, family = "binomial", data = ddf) %>%
  predict(newData = ddf) %>%
  showDF

Note that Spark doesn’t like points in column names, which is why they are manually reset here.

Result
+-------+-------+-------+-------+-------+-------+ ... +----------+
|sep_len|sep_wid|pet_len|pet_wid|species|to_pred| ... |prediction|
+-------+-------+-------+-------+-------+-------+ ... +----------+
|    5.1|    3.5|     1.4|   0.2| setosa|   true| ... |       1.0|
|    4.9|    3.0|     1.4|   0.2| setosa|   true| ... |       1.0|
|    4.7|    3.2|     1.3|   0.2| setosa|   true| ... |       1.0|
|    4.6|    3.1|     1.5|   0.2| setosa|   true| ... |       1.0|
|    5.0|    3.6|     1.4|   0.2| setosa|   true| ... |       1.0|
|    5.4|    3.9|     1.7|   0.4| setosa|   true| ... |       1.0|
|    4.6|    3.4|     1.4|   0.3| setosa|   true| ... |       1.0|
|    5.0|    3.4|     1.5|   0.2| setosa|   true| ... |       1.0|
|    4.4|    2.9|     1.4|   0.2| setosa|   true| ... |       1.0|
|    4.9|    3.1|     1.5|   0.1| setosa|   true| ... |       1.0|
|    5.4|    3.7|     1.5|   0.2| setosa|   true| ... |       1.0|
|    4.8|    3.4|     1.6|   0.2| setosa|   true| ... |       1.0|
|    4.8|    3.0|     1.4|   0.1| setosa|   true| ... |       1.0|
|    4.3|    3.0|     1.1|   0.1| setosa|   true| ... |       1.0|
|    5.8|    4.0|     1.2|   0.2| setosa|   true| ... |       1.0|
|    5.7|    4.4|     1.5|   0.4| setosa|   true| ... |       1.0|
|    5.4|    3.9|     1.3|   0.4| setosa|   true| ... |       1.0|
|    5.1|    3.5|     1.4|   0.3| setosa|   true| ... |       1.0|
|    5.7|    3.8|     1.7|   0.3| setosa|   true| ... |       1.0|
|    5.1|    3.8|     1.5|   0.3| setosa|   true| ... |       1.0|
+-------+-------+-------+-------+-------+-------+ ... +----------+
only showing top 20 rows

More Advanced Types and Queries

A few nice features were added in Spark 1.5 in regards to types and operations in distributed data frames.

The %in% operator

This was definitely missing in Spark 1.4 but now you can do more complex queries via the %in% operator.

ddf  createDataFrame(sqlContext, ChickWeight)
ddf %>%
  filter(.$Diet %in% c("3","4")) %>%
  sample(TRUE, 0.05) %>%
  collect
Result
   weight Time Chick Diet
1      87    6    35    3
2      48    2    37    3
3     109   10    38    3
4     232   18    38    3
5      66    4    40    3
6     215   16    40    3
7     155   12    41    4
8     204   16    42    4
9     198   18    43    4
10    101    8    46    4

Date types

You can now start playing around with dates in SparkR.

df  data.frame(
  date = as.Date("2015-04-01") + 0:99,
  r = runif(100)
)

ddf  createDataFrame(sqlContext, df)
ddf %>% printSchema
Result
root
 |-- date: date (nullable = true)
 |-- r: double (nullable = true)

You can filter these dates by using date types.

ddf %>% 
  filter(.$date > as.Date("2015-04-03")) %>% 
  filter(.$date % 
  collect
Result
        date         r
1 2015-04-04 0.5821896
2 2015-04-05 0.9939826
3 2015-04-06 0.4792869
4 2015-04-07 0.3411329

SparkR even has some lubridate-ish support for date manipulation.

ddf %>% 
  withColumn("dom", .$date %>% dayofmonth) %>% 
  withColumn("doy", .$date %>% dayofyear) %>% 
  withColumn("woy", .$date %>% weekofyear) %>% 
  head
Result
        date         r dom doy woy
1 2015-04-01 0.2736077   1  91  14
2 2015-04-02 0.2015249   2  92  14
3 2015-04-03 0.3586754   3  93  14
4 2015-04-04 0.6162447   4  94  14
5 2015-04-05 0.5220081   5  95  14
6 2015-04-06 0.4814839   6  96  15

You can do similar things with datetimes.

df  data.frame(
  t = as.POSIXct("2015-01-01 00:00:00") + runif(1000)*10000,
  r = runif(1000)
)

ddf  createDataFrame(sqlContext, df)
ddf %>% printSchema
Result
root
 |-- t: timestamp (nullable = true)
 |-- r: double (nullable = true)

Besides dates, we’ve also got some lubri-time methods in SparkR now.

ddf %>% 
  withColumn("hour", .$t %>% hour) %>% 
  withColumn("minute", .$t %>% minute) %>% 
  withColumn("unix_t", .$t %>% unix_timestamp) %>% 
  head
Result
                    t         r hour minute     unix_t
1 2015-01-01 01:08:09 0.1484953    1      8 1420070889
2 2015-01-01 02:27:22 0.6907954    2     27 1420075642
3 2015-01-01 01:11:24 0.6616176    1     11 1420071084
4 2015-01-01 00:11:06 0.9897747    0     11 1420067466
5 2015-01-01 02:07:13 0.2923660    2      7 1420074433
6 2015-01-01 01:06:16 0.6781178    1      6 1420070776

For most dataframe operations, this is nice to start playing with, but don’t expect the full flexibility of base R just yet. Right now dates and datetimes cannot be summarized and they cannot be used in models. This is a current known issue in Jira that people are working on.

Levenshtein

These might fall in the special usecase category, but it can be surprisingly handy when trying to find similar names in a large databank.

df  data.frame(
  a = c('saark', 'spork', 'siaru', 'soobk'),
  b = c('spark', 'spark', 'spark', 'spark')
)

ddf  createDataFrame(sqlContext, df)
ddf$c  levenshtein(ddf$a, ddf$b)
ddf %>% head
Results
      a     b c
1 saark spark 1
2 spork spark 1
3 siaru spark 2
4 soobk spark 3

Regex

Again, regexes are a bit of a special use case to a lot of R users but they are invaluable when analyzing log files.

df  data.frame(
  s = c('100-202', '2a4ta24', '300-200', 't2t2t2')
)

ddf  createDataFrame(sqlContext, df)

ddf %>%
  withColumn('regex1', regexp_extract(.$s, "(\d+)-(\d+)", 1)) %>%
  withColumn('regex2', regexp_extract(.$s, "(\d+)-(\d+)", 2)) %>%
  withColumn('regex3', regexp_replace(.$s, "(\d+)-(\d+)", "HERE!")) %>%
  head
Results
        s regex1 regex2  regex3
1 100-202    100    202   HERE!
2 2a4ta24               2a4ta24
3 300-200    300    200   HERE!
4  t2t2t2                t2t2t2

The future

Spark could use some more features, but the recent additions already grant many usecases. Spark is a project with enormous traction and has a new release every 3 months, so you can expect more to come.

Being able to work with a distributed dataframe in a dplyr-like syntax really opens up doors for R users who want to handle larger datasets. The fact that all of this can run on Amazon cheaply adds to the benefit.

Back to Blog overview

Questions?

Get in touch with us to learn more about the subject and related solutions

Explore related posts