Econometrics and Free Software by Bruno Rodrigues.
RSS feed for blog post updates.
Follow me on Mastodon, twitter, or check out my Github.
Check out my package that adds logging to R functions, {chronicler}.
Or read my free ebooks, to learn some R and build reproducible analytical pipelines..
You can also watch my youtube channel or find the slides to the talks I've given here.
Buy me a coffee, my kids don't let me sleep.

Getting {sparklyr}, {h2o}, {rsparkling} to work together and some fun with bash

R

This is going to be the type of blog posts that would perhaps be better as a gist, but it is easier for me to use my blog as my own personal collection of gists. Plus, someone else might find this useful, so here it is! In this blog post I am going to show a little trick to randomly sample rows from a text file using bash, and then train a model using the {h2o} package. I will also use the {rsparkling} package. From {rsparkling}’s documentation: {rsparkling} is a package that provides an R interface to the H2O Sparkling Water machine learning library. and will be needed to transfer the data from Spark to H2O.

In a previous blog post I used the {sparklyr} package to load a 30GB csv file into R. I created the file by combining around 300 csv files, each around 80MB big. Here, I would like to use the machine learning functions included in the {h2o} packages to train a random forest on this data. However, I only want to have a simple prototype that simply runs, and check if all the packages work well together. If everything is ok, I’ll keep iterating to make the model better (in a possible subsequent post).

For fast prototyping, using 30GB of data is not a good idea, so I am going to sample 500000 from this file using the linux command line (works on macOS too and also on Windows if you installed the linux subsystem). Why not use R to sample 500000 rows? Because on my machine, loading the 30GB file takes 25 minutes. Sampling half a million lines from it would take quite long too. So here are some bash lines that do that directly on the file, without needing to load it into R beforehand:

[18-03-03 21:50] brodriguesco in /Documents/AirOnTimeCSV ➤ get_seeded_random()
{
  seed="$1"
  openssl enc -aes-256-ctr -pass pass:"$seed" -nosalt \
  </dev/zero 2>/dev/null
}

[18-03-03 21:50] brodriguesco in /Documents/AirOnTimeCSV ➤ sed "1 d" combined.csv | shuf --random-source=<(get_seeded_random 42) -n 500000 > small_combined_temp.csv

[18-03-03 21:56] brodriguesco in /Documents/AirOnTimeCSV ➤ head -1 combined.csv > colnames.csv

[18-03-03 21:56] brodriguesco in /Documents/AirOnTimeCSV ➤ cat colnames.csv small_combined_temp.csv > small_combined.csv

The first function I took from the gnu coreutils manual which allows me to fix the random seed to reproduce the same sampling of the file. Then I use "sed 1 d" cobmined.csv to remove the first line of combined.csv which is the header of the file. Then, I pipe the result of sed using | to shuf which does the shuffling. The option --random-source=<(get_seeded_random 42) fixes the seed, and -n 500000 only shuffles 500000 and not the whole file. The final bit of the line, > small_combined_temp.csv, saves the result to small_cobmined_temp.csv. Because I need to add back the header, I use head -1 to extract the first line of combined.csv and save it into colnames.csv. Finally, I bind the rows of both files using cat colnames.csv small_combined_temp.csv and save the result into small_combined.cvs. Taken together, all these steps took about 5 minutes (without counting the googling around for finding how to pass a fixed seed to shuf).

Now that I have this small dataset, I can write a small prototype:

First, you need to install {sparklyr}, {rsparkling} and {h2o}. Refer to this to know how to install the packages. I had a mismatch between the version of H2O that was automatically installed when I installed the {h2o} package, and the version of Spark that {sparklyr} installed but thankfully the {h2o} package returns a very helpful error message with the following lines:

detach("package:rsparkling", unload = TRUE)
                       if ("package:h2o" %in% search()) { detach("package:h2o", unload = TRUE) }
                       if (isNamespaceLoaded("h2o")){ unloadNamespace("h2o") }
                       remove.packages("h2o")
                       install.packages("h2o", type = "source", repos = "https://h2o-release.s3.amazonaws.com/h2o/rel-weierstrass/2/R")

which tells you which version to install.

So now, let’s load everything:

library(sparklyr)
library(rsparkling)
library(h2o)
## 
## ----------------------------------------------------------------------
## 
## Your next step is to start H2O:
##     > h2o.init()
## 
## For H2O package documentation, ask for help:
##     > ??h2o
## 
## After starting H2O, you can use the Web UI at http://localhost:54321
## For more information visit http://docs.h2o.ai
## 
## ----------------------------------------------------------------------
## 
## Attaching package: 'h2o'
## The following objects are masked from 'package:stats':
## 
##     cor, sd, var
## The following objects are masked from 'package:base':
## 
##     &&, %*%, %in%, ||, apply, as.factor, as.numeric, colnames,
##     colnames<-, ifelse, is.character, is.factor, is.numeric, log,
##     log10, log1p, log2, round, signif, trunc
h2o.init()
## 
## H2O is not running yet, starting it now...
## 
## Note:  In case of errors look at the following log files:
##     /tmp/Rtmph48vf9/h2o_cbrunos_started_from_r.out
##     /tmp/Rtmph48vf9/h2o_cbrunos_started_from_r.err
## 
## 
## Starting H2O JVM and connecting: .. Connection successful!
## 
## R is connected to the H2O cluster: 
##     H2O cluster uptime:         1 seconds 944 milliseconds 
##     H2O cluster version:        3.16.0.2 
##     H2O cluster version age:    4 months and 15 days !!! 
##     H2O cluster name:           H2O_started_from_R_cbrunos_bpn152 
##     H2O cluster total nodes:    1 
##     H2O cluster total memory:   6.98 GB 
##     H2O cluster total cores:    12 
##     H2O cluster allowed cores:  12 
##     H2O cluster healthy:        TRUE 
##     H2O Connection ip:          localhost 
##     H2O Connection port:        54321 
##     H2O Connection proxy:       NA 
##     H2O Internal Security:      FALSE 
##     H2O API Extensions:         XGBoost, Algos, AutoML, Core V3, Core V4 
##     R Version:                  R version 3.4.4 (2018-03-15)
## Warning in h2o.clusterInfo(): 
## Your H2O cluster version is too old (4 months and 15 days)!
## Please download and install the latest version from http://h2o.ai/download/

I left all the startup messages because they’re quite helpful. Especially that bit telling you to start H2O with h2o.init(). If something’s wrong, h2o.init() will give you helpful information.

Now that all this is loaded, I can start working on the data (the steps below are explained in detail in my previous blog post):

spark_dir = "/my_2_to_disk/spark/"

config = spark_config()

config$`sparklyr.shell.driver-memory` <- "4G"
config$`sparklyr.shell.executor-memory` <- "4G"
config$`spark.yarn.executor.memoryOverhead` <- "512"
config$`sparklyr.shell.driver-java-options` = paste0("-Djava.io.tmpdir=", spark_dir)

sc = spark_connect(master = "local", config = config)

Another useful function that allows you to check if everything is alright is h2o_context():

h2o_context(sc)
<jobj[12]>
  org.apache.spark.h2o.H2OContext

Sparkling Water Context:
 * H2O name: sparkling-water-cbrunos_local-1520111879840
 * cluster size: 1
 * list of used nodes:
  (executorId, host, port)
  ------------------------
  (driver,127.0.0.1,54323)
  ------------------------

  Open H2O Flow in browser: http://127.0.0.1:54323 (CMD + click in Mac OSX)

Now, let’s load the data into R with {sparklyr}:

air = spark_read_csv(sc, name = "air", path = "small_combined.csv")

Of course, here, using Spark is overkill, because small_combined.csv is only around 100MB big, so no need for {sparklyr} but as stated in the beginning this is only to have a quick and dirty prototype. Once all the pieces are working together, I can iterate on the real data, for which {sparklyr} will be needed. Now, if I needed to use {dplyr} I could use it on air, but I don’t want to do anything on it, so I convert it to a h2o data frame. h2o data frames are needed as arguments for the machine learning algorithms included in the {h2o} package. as_h2o_frame() is a function included in {rsparkling}:

air_hf = as_h2o_frame(sc, air)

Then, I convert the columns I need to factors (I am only using factors here):

air_hf$ORIGIN = as.factor(air_hf$ORIGIN)
air_hf$UNIQUE_CARRIER = as.factor(air_hf$UNIQUE_CARRIER)
air_hf$DEST = as.factor(air_hf$DEST)

{h2o} functions need the names of the predictors and of the target columns, so let’s define that:

target = "ARR_DELAY"
predictors = c("UNIQUE_CARRIER", "ORIGIN", "DEST")

Now, let’s train a random Forest, without any hyper parameter tweaking:

model = h2o.randomForest(predictors, target, training_frame = air_hf)

Now that this runs, I will in the future split the data into training, validation and test set, and train a model with better hyper parameters. For now, let’s take a look at the summary of model:

summary(model)
Model Details:
==============

H2ORegressionModel: drf
Model Key:  DRF_model_R_1520111880605_1
Model Summary:
  number_of_trees number_of_internal_trees model_size_in_bytes min_depth
1              50                       50            11055998        20
  max_depth mean_depth min_leaves max_leaves mean_leaves
1        20   20.00000       1856       6129  4763.42000

H2ORegressionMetrics: drf
** Reported on training data. **
** Metrics reported on Out-Of-Bag training samples **

MSE:  964.9246
RMSE:  31.06324
MAE:  17.65517
RMSLE:  NaN
Mean Residual Deviance :  964.9246





Scoring History:
             timestamp   duration number_of_trees training_rmse training_mae
1  2018-03-03 22:52:24  0.035 sec               0
2  2018-03-03 22:52:25  1.275 sec               1      30.93581     17.78216
3  2018-03-03 22:52:25  1.927 sec               2      31.36998     17.78867
4  2018-03-03 22:52:26  2.272 sec               3      31.36880     17.80359
5  2018-03-03 22:52:26  2.564 sec               4      31.29683     17.79467
6  2018-03-03 22:52:26  2.854 sec               5      31.31226     17.79467
7  2018-03-03 22:52:27  3.121 sec               6      31.26214     17.78542
8  2018-03-03 22:52:27  3.395 sec               7      31.20749     17.75703
9  2018-03-03 22:52:27  3.666 sec               8      31.19706     17.74753
10 2018-03-03 22:52:27  3.935 sec               9      31.16108     17.73547
11 2018-03-03 22:52:28  4.198 sec              10      31.13725     17.72493
12 2018-03-03 22:52:32  8.252 sec              27      31.07608     17.66648
13 2018-03-03 22:52:36 12.462 sec              44      31.06325     17.65474
14 2018-03-03 22:52:38 14.035 sec              50      31.06324     17.65517
   training_deviance
1
2          957.02450
3          984.07580
4          984.00150
5          979.49147
6          980.45794
7          977.32166
8          973.90720
9          973.25655
10         971.01272
11         969.52856
12         965.72249
13         964.92530
14         964.92462

Variable Importances: (Extract with `h2o.varimp`)
=================================================

Variable Importances:
        variable relative_importance scaled_importance percentage
1         ORIGIN    291883392.000000          1.000000   0.432470
2           DEST    266749168.000000          0.913890   0.395230
3 UNIQUE_CARRIER    116289536.000000          0.398411   0.172301
>

If you found this blog post useful, you might want to follow me on twitter for blog post updates.