Update, 2019. I have now written an updated post on cross-validation with sparklyr, as well as a follow-up on using cross-validation for feature selection. These posts would be better to read as the code here no longer works following changes to sparklyr.

In this post I’m going to run through a brief example of using sparklyr in R. This package provides a way to connect to Spark from within R, while using the dplyr functions we all know and love. I was entirely new to Spark, and databases in general, before having a play with sparklyr. Seemingly its main rival is the more mature SparkR package. Interestingly, SparkR has a number of functions that look very similar to dplyr (e.g. SparkR::select()) but behave slightly differently. Indeed, if we load in SparkR after dplyr we get this message.

The following objects are masked from ‘package:dplyr’:

    arrange, between, coalesce, collect, contains, count, cume_dist, dense_rank, desc, distinct, explain, filter, first, group_by, intersect,
    lag, last, lead, mutate, n, n_distinct, ntile, percent_rank, rename, row_number, sample_frac, select, sql, summarize, union

If you’re a dplyr fanatic like me this looks a bit worrying on first sight – how can I R without my dplyr verbs?! The fact that the SparkR equivalents to dplyr verbs behave slightly differently could also be a bit confusing. For example, to select a column with SparkR::select() you either have to use "column_name" or df$column_name. In other words, the familiar df %>% select(column_name) won’t work. This is going to be a bit surprising if you’ve become accustomed to dplyr’s non-standard evaluation. Nevertheless, SparkR is clearly a more mature package. That said, With R Studio behind it, we can expect sparklyr to be well supported moving forward.

Below I run through manipulating a Spark DataFrame from within R, as well as how to run the machine learning models offered by MLlib on a Spark cluster from R. At the end I provide a function for doing k-fold crossvalidation for any of the classifiers in MLlib.

Set up

library(tidyverse)
library(sparklyr)
library(caret)

knitr::opts_chunk$set(message = FALSE, warning = FALSE)

# set up spark connection
sc <- spark_connect(master = "local")

For this example I’m using some data from the UCI Machine Learning Repositry. The data is from a direct marketing campaign of “a Portuguese banking institution”. The outcome of interest is whether customers subscribed to some product (see here). This dataset isn’t exactly thrilling, but it’s small enough that none of the code takes too long to run.

# Download the zipped folder
download.file("https://archive.ics.uci.edu/ml/machine-learning-databases/00222/bank.zip", "../../static/data/bank.zip")

# unzip the folder
unzip(zipfile = "../../static/data/bank.zip",
      exdir = "../../static/data")

df_raw <- read_delim("../../static/data/bank.csv", delim = ";") %>%
  map_if(is.character, as.factor) %>%
  as_data_frame()

The first thing we need to do is turn all the factors into integers, as this is how MLlib likes it. Further, the labels for a categorical outcomes have to be in \(\{0,1\}\). Finally, the data is copied to the Spark cluster to reflect a situation where we’re dealing with data already in Spark.

df <- df_raw %>%
  map_if(is.factor, as.integer) %>%
  as_data_frame() %>%
  mutate(y = y - 1) %>% # because labels must be in {0,1} for MLlib
  copy_to(sc, ., name = "df") # copy our R data frame onto spark

The R object df is of class tbl_spark and represents a connection to a Spark DataFrame. We can do various manipulations to this object in R and then compute() to create a new table on the cluster. We can also collect() one of these tbl_sparks to get the data locally into R.

While I’m not using them here, sparklyr offers a range of functions that allow us to access the Spark tools for transforming/preprocessing data. Happily, these are all prefixed ft_, making it easy to see what’s available. For example, we could use ft_binarizer() to create a binary values from column in our Spark DataFrame by providing a threshold value. These functions are important because we can’t just run any arbitrary R code on df. This is because it’s a connection to a DataFrame stored on Spark, rather than a standard R dataframe. We can see this when we print df out.

df
## # Source:   table<df> [?? x 17]
## # Database: spark_connection
##      age   job marital education default balance housing  loan contact
##    <int> <int>   <int>     <int>   <int>   <int>   <int> <int>   <int>
##  1    30    11       2         1       1    1787       1     1       1
##  2    33     8       2         2       1    4789       2     2       1
##  3    35     5       3         3       1    1350       2     1       1
##  4    30     5       2         3       1    1476       2     2       3
##  5    59     2       2         2       1       0       2     1       3
##  6    35     5       3         3       1     747       1     1       1
##  7    36     7       2         3       1     307       2     1       1
##  8    39    10       2         2       1     147       2     1       1
##  9    41     3       2         3       1     221       2     1       3
## 10    43     8       2         1       1     -88       2     2       1
## # ... with more rows, and 8 more variables: day <int>, month <int>,
## #   duration <int>, campaign <int>, pdays <int>, previous <int>,
## #   poutcome <int>, y <dbl>

If we do want to run arbitrary R code on a Spark DataFrame we can use spark_apply(). Alternatively, we could collect() our data into a standard R dataframe and do our manipulation then copy_to() Spark. In practice, this is unlikely to be an option – why would we bother with a computer cluster if our data fits into R?!

Fitting a random forest

Partitioning the data

sparklyr gives us an easy way to partiton a Spark DataFrame. Here we’re creating a copy of df in Spark with compute(), before partitioning this copy into test and training data. We can then access the partitions with, e.g., df_part$test. We can have as many partitions as we want, so in practice we’d probably want to do k-fold cross validation (see below).

df_part <- df %>%
  compute("df_part") %>%
  sdf_partition(test = 0.2, train = 0.8, seed = 2017)

Fit the model

From sparklyr we can access a range of models available to use with Spark. They are all prefixed with ml_. To run a random forest we can use ml_random_forest(). One thing that is a bit confusing if you’re working with the MLlib documentation is that some of the parameter names are quite different for the sparklyr functions compared to what they’re called by MLlib. For example, the proportion of the predictors to use for making splits of a random forest is called featureSubsetStrategy in the MLlib documentation but col.sample.rate by sparklyr. Another example is that MLlib calls the number of trees for gradient boosted trees numInterations whereas sparklyr calls it num.trees.

fit_rf <- ml_random_forest(
  df_part$train, # the training partion
  response = "y",
  features = colnames(df_part$train)[1:16], # the names minus the outcome
  impurity = "entropy",
  max.bins = 32L, # default = 32L
  max.depth = 5L, # default = 5L
  num.trees = 100L,  
  learn.rate = 0.1, # default = 0.1
  col.sample.rate = 0.25, # aka featureSubsetStrategy
  type = "classification",
  seed = 2017
)

Evaluate the model

We can get out the feature importance for the model by providing our Spark connection and the model object.

ml_tree_feature_importance(sc = sc, model = fit_rf)
##             importance   feature
## 1    0.513011341044725  duration
## 2   0.0802082038509115  poutcome
## 3    0.059010844561907       age
## 4   0.0538740319352658     pdays
## 5   0.0500510804148416     month
## 6   0.0494220628070373   contact
## 7   0.0437808856900149  previous
## 8   0.0356504629964395       day
## 9   0.0294190550796405   balance
## 10  0.0207771897883123       job
## 11  0.0196039047639807   housing
## 12    0.01626807994962  campaign
## 13  0.0109601402422286   marital
## 14 0.00964582937505621      loan
## 15 0.00699162595376989 education
## 16 0.00132526154624899   default

We can make predictions from a model for some data with sdf_predict(). MLlib also has built in evaluation for models, which can be accessed through sparklyr. Below we are making predictions for the training data, then getting the F1 score. Note that the data we provide to sdf_predict() is in Spark not R, we’re just providing the R connection to the Spark data. Assuming we’re using Spark 2.X, we could alternatively set the metric to weightedPrecision, weightedRecall or accuracy. More metrics are available for Spark 1.6.

fit_rf %>%
  # predict from the model for the training data
  sdf_predict(df_part$train) %>%
  ml_classification_eval(label = "y",
                         predicted_lbl = "prediction",
                         metric = "f1") # default, F1 score 
## [1] 0.858982

Next we can do predictions for the test data

test_rf <- sdf_predict(fit_rf, df_part$test)

And calculate the F1 score again.

(test_rf_f1 <- test_rf %>%
    ml_classification_eval(label = "y",
                           predicted_lbl = "prediction",
                           metric = "f1")) # default, F1 score 
## [1] 0.8640745

The random forest model returns probababilities as well as predicted categories, unlike other models (e.g. gradient boosted trees). As our outcome is also binary we can use ml_binary_classification_eval() to get back the area under the ROC curve for our model.

(rf_test_auc <- test_rf %>%
    ml_binary_classification_eval(label = "y",
                                  score = "probability")) 
## [1] 0.8911765

This information is a little limited though. Data size permitting, we could collect() our DataFrame into R to get a tibble of the data used to make the predictions, with prediction and probability columns added.

rf_test_df <- collect(test_rf)

We can then use caret::confusionMatrix() to get a more exhaustive set of performance metrics. From the output it seems the RF is no better than just predicting the majority category for everyone (the No Information Rate).

confusionMatrix(data = rf_test_df$prediction, reference = rf_test_df$y)
## Confusion Matrix and Statistics
## 
##           Reference
## Prediction   0   1
##          0 822  89
##          1   8  13
##                                           
##                Accuracy : 0.8959          
##                  95% CI : (0.8745, 0.9148)
##     No Information Rate : 0.8906          
##     P-Value [Acc > NIR] : 0.3222          
##                                           
##                   Kappa : 0.1808          
##  Mcnemar's Test P-Value : 4.557e-16       
##                                           
##             Sensitivity : 0.9904          
##             Specificity : 0.1275          
##          Pos Pred Value : 0.9023          
##          Neg Pred Value : 0.6190          
##              Prevalence : 0.8906          
##          Detection Rate : 0.8820          
##    Detection Prevalence : 0.9775          
##       Balanced Accuracy : 0.5589          
##                                           
##        'Positive' Class : 0               
## 

We could also look as those predicted probabilities.

rf_test_df <- rf_test_df %>%
  mutate(prob_no = unlist(map(probability, 1)),
         prob_yes = unlist(map(probability, 2)))

ggplot(rf_test_df, aes(x = prob_no, y = duration, fill = as.factor(y))) +
  geom_point(alpha = .5, color = "black", shape = 21)

K-fold cross validiation

Clearly a better way to evaluate a model is k-fold cross validation. This is pretty easy to do with sparklyr, and I’ve provided a function below to automate the process, at least for classification. The first thing we’ll need to do is create some partitions.

k = 5 # number of fold

# the proportion of the data to go in each fold
weights <- rep(1/k, times = k)
# our vector of weights needs to be named to work below
names(weights) <- paste0("fold", as.character(1:k))

# create a copy of df then partion it into 5 partitions 
# with 0.2 of the data in each one
df_cv <- df %>%
  compute("df_cv") %>%
  sdf_partition(weights = weights,
                seed = 2017)

Now, lets train a model on partitions 1 to 4, and test on 5, printing out the AUC at the end.

train_1 <- sdf_bind_rows(df_cv[1:4])

fit_rf_1 <- ml_random_forest(
  train_1,
  response = "y",
  features = colnames(df_cv$fold1)[1:16],
  impurity = "entropy",
  max.bins = 32L, # default = 32L
  max.depth = 6L, # default = 5L
  num.trees = 100L,  
  col.sample.rate = 0.25, # aka featureSubsetStrategy
  type = "classification",
  seed = 2017
)

# Get the AUC for the left out fold
sdf_predict(fit_rf_1, df_cv$fold5) %>%
  ml_binary_classification_eval(label = "y",
                                score = "probability")
## [1] 0.8876745

It would be a lot better to create a function to do cross validation where we have control over \(k\) and the model that is used.

ml_classification_cross_validation <- function(
  data,            # a tbl_spark
  response,        # the response column as a character
  features = NULL, # character vector or else all the columns except response
  model_fun,       # the modelling function to use (unquoted)
  k,               # number of folds
  ...) {           # additional arguments
  
  # first off create weights for partitioning the data
  weights <- rep(1 / k, times = k)
  # name the elements of weights
  names(weights) <- paste0("fold", as.character(1:k))
  
  # partition the data using weights
  data_cv <- sdf_partition(data, weights = weights)
  
  # get the indicies for our different training sets
  # e.g. a dataFrame for partitions 1, 2, 3, and 4
  K <- 1:k
  indices <- purrr::map(K, ~ K[-.x])
  
  # create our training sets by binding together partitions of data_cv
  # We get back a list where each element is a dataFrame of the partions
  # indexed by each element of indices combined together
  data_splits <- purrr::map(indices, ~ sdf_bind_rows(data_cv[.x]))
  
  # If a vector of feature names hasn't been specified
  if (is.null(features)) {
  # Get the column names of the data
  columns <- colnames(data_splits[[1]])
  # Create the feature names by using all the columns except the response
  features <- columns[columns != response]
  }
  
  # Map the specified model_fun over each of our training sets (the elements of
  # data splits)
  fits <- purrr::map(.x = data_splits,
                     .f = purrr::as_mapper(model_fun),
               response = response,
               features = features,
                   type = "classification",
                       ...)
  
  # Get some predictions for each model for the partiton that wasn't used
  # to train that model.
  preds <- purrr::map2(fits, K, ~ sdf_predict(.x, data_cv[[.y]]))
  
  # Checking whether a model that outputs probabilities was used.
  # If so the relevant evaluation function is used.
  # (Should also test for whether the response is binary but doesn't atm)
  if (any(stringr::str_detect(colnames(preds[[1]]), "probability"))) {
    
  evals <- purrr::map(preds,
                      ml_binary_classification_eval,
                      label = response,
                      score = "probability")
  } else {
    
  evals <- purrr::map(preds,
                      ml_classification_eval,
                      predicted_lbl = "prediction",
                      label = response)
  }
  # This is what will be returned
  list(fits = fits,
  predictions = preds,
  evals = evals)
  
}

Now this function can be used to do classification with whatever model we like. E.g. gradient boosted trees:

gbt_5_fold_cv <- 
  ml_classification_cross_validation(df, 
                                     response = "y",
                                     model_fun = ml_gradient_boosted_trees,
                                     k = 5,
                                     max.depth = 3L,
                                     num.trees = 10L,
                                     seed = 2017)

We can then get the average of our evaluation metric. Here it will be

mean(unlist(gbt_5_fold_cv$evals))
## [1] 0.8680992

Disconnect

Disconnect from our Spark cluster.

spark_disconnect(sc)

Further reading

Hopefully this post has been useful for providing an idea of what a sparklyr workflow could look like. In particular, the function to do cross validation should hopefully be useful for some people. Clearly, there’s loads of stuff I didn’t cover in this post. Most obviously, I haven’t gone into using dplyr verbs with a tbl_spark object. For examples of what this looks like you can see here. DataCamp also have a course on sparklyr, which I found very useful. The MLlib Main Guide is a good place to start for an overview of the features it offers, but you’ll want to look at the API Guide to get more information on the models.