7 min read

Selecting the Max Value from Each Group, a Case Study: dplyr and sparklyr

Introduction

In my last post we looked at how to slice a data.table by group to obtain the rows for which a particular column in that group is at its maximum value using the excellent data.table package. In this post, we will be taking a look at how to perform this task using dplyr and sparklyr.

dplyr

First, let’s take a look at our data.

library(dplyr)
mtcars <- mtcars %>% 
  tibble::rownames_to_column(var = "car") %>% 
  tibble::as_tibble()
mtcars
# # A tibble: 32 x 12
#    car           mpg   cyl  disp    hp  drat    wt  qsec    vs    am  gear  carb
#    <chr>       <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl>
#  1 Mazda RX4    21       6  160    110  3.9   2.62  16.5     0     1     4     4
#  2 Mazda RX4 …  21       6  160    110  3.9   2.88  17.0     0     1     4     4
#  3 Datsun 710   22.8     4  108     93  3.85  2.32  18.6     1     1     4     1
#  4 Hornet 4 D…  21.4     6  258    110  3.08  3.22  19.4     1     0     3     1
#  5 Hornet Spo…  18.7     8  360    175  3.15  3.44  17.0     0     0     3     2
#  6 Valiant      18.1     6  225    105  2.76  3.46  20.2     1     0     3     1
#  7 Duster 360   14.3     8  360    245  3.21  3.57  15.8     0     0     3     4
#  8 Merc 240D    24.4     4  147.    62  3.69  3.19  20       1     0     4     2
#  9 Merc 230     22.8     4  141.    95  3.92  3.15  22.9     1     0     4     2
# 10 Merc 280     19.2     6  168.   123  3.92  3.44  18.3     1     0     4     4
# # … with 22 more rows

So here, we are interested in getting a single car from each cyl group whose mpg is at the maximum for that group. I really like the dplyr syntax for this problem, it’s really straight forward; take a look below:

mtcars %>% 
  group_by(cyl) %>% 
  arrange(desc(mpg)) %>% 
  slice(1) %>% 
  ungroup()
# # A tibble: 3 x 12
#   car            mpg   cyl  disp    hp  drat    wt  qsec    vs    am  gear  carb
#   <chr>        <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl>
# 1 Toyota Coro…  33.9     4  71.1    65  4.22  1.84  19.9     1     1     4     1
# 2 Hornet 4 Dr…  21.4     6 258     110  3.08  3.22  19.4     1     0     3     1
# 3 Pontiac Fir…  19.2     8 400     175  3.08  3.84  17.0     0     0     3     2

We tell dplyr to create groups of data for each of the cyl levels and then within each group we arrange() by mpg in descending order. Once we have our data organised in this way it’s as simple as taking the top row from each group using slice(). Of course there is more than one way we can achieve this task using dplyr, take this next example for instance.

mtcars %>% 
  group_by(cyl) %>% 
  arrange(desc(mpg)) %>% 
  mutate(row_number = row_number()) %>% 
  filter(row_number == 1) %>% 
  select(-row_number) %>% 
  ungroup()
# # A tibble: 3 x 12
#   car            mpg   cyl  disp    hp  drat    wt  qsec    vs    am  gear  carb
#   <chr>        <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl>
# 1 Toyota Coro…  33.9     4  71.1    65  4.22  1.84  19.9     1     1     4     1
# 2 Hornet 4 Dr…  21.4     6 258     110  3.08  3.22  19.4     1     0     3     1
# 3 Pontiac Fir…  19.2     8 400     175  3.08  3.84  17.0     0     0     3     2

It might not seem to be the most logical approach to this problem given we have access to the slice() function but it feeds nicely into the sparklyr section.

If you are interested, below you can see the benchmarks for these two dplyr approaches. We can see that the slice() approach is much quicker than the mutate() approach which we would expect since there is much less manipulation of the data going on in the first approach.

sparklyr

A great feature of dplyr is its ability to execute your R code on a Spark cluster. To achieve this, dplyr uses the dbplyr package which translates your dplyr code into Spark SQL code which can then be passed to the Spark connection to be executed by your Spark cluster. The problem, however, is that not all dplyr verbs translate.

First, let’s set up a local Spark cluster and upload the mtcars data to it.

library(sparklyr)
sc <- spark_connect(master = "local")
mtcars_spark <- copy_to(sc, mtcars, "mtcars")

Now we can explore our first dplyr example by attempting to execute it on the Spark cluster. We use dbplyr::sql_render() as the final part of the chain to try and see the SQL code that dbplyr translates the dplyr code to.

mtcars_spark %>% 
  group_by(cyl) %>% 
  arrange(desc(mpg)) %>% 
  slice(1) %>% 
  ungroup() %>% 
  dbplyr::sql_render()
# Error in slice_.tbl_spark(.data, .dots = compat_as_lazy_dots(...)): Slice is not supported in this version of sparklyr

In this instance, dplyr tells us that we cannot use slice() since it is not currently supported by sparklyr, this is because there is no direct translation from slice() to Spark SQL code. So let’s try our second approach.

mtcars_spark %>% 
  group_by(cyl) %>% 
  arrange(desc(mpg)) %>% 
  mutate(row_number = row_number()) %>% 
  filter(row_number == 1) %>% 
  select(-row_number) %>% 
  ungroup() %>% 
  dbplyr::sql_render()
# <SQL> SELECT `car`, `mpg`, `cyl`, `disp`, `hp`, `drat`, `wt`, `qsec`, `vs`, `am`, `gear`, `carb`
# FROM (SELECT `car`, `mpg`, `cyl`, `disp`, `hp`, `drat`, `wt`, `qsec`, `vs`, `am`, `gear`, `carb`, ROW_NUMBER() OVER (PARTITION BY `cyl` ORDER BY `mpg` DESC) AS `row_number`
# FROM (SELECT *
# FROM `mtcars`
# ORDER BY `mpg` DESC) `dbplyr_001`) `dbplyr_002`
# WHERE (`row_number` = 1.0)

Here we see that the function row_number() does translate; since it is a ranking function which mimics the functions described in SQL2003 (see ?ranking), dbplyr knows the equivalent SQL code.

mtcars_spark %>% 
  group_by(cyl) %>% 
  arrange(desc(mpg)) %>% 
  mutate(row_number = row_number()) %>% 
  filter(row_number == 1) %>% 
  select(-row_number) %>% 
  ungroup() %>% 
  collect()
# # A tibble: 3 x 12
#   car            mpg   cyl  disp    hp  drat    wt  qsec    vs    am  gear  carb
#   <chr>        <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl>
# 1 Hornet 4 Dr…  21.4     6 258     110  3.08  3.22  19.4     1     0     3     1
# 2 Toyota Coro…  33.9     4  71.1    65  4.22  1.84  19.9     1     1     4     1
# 3 Pontiac Fir…  19.2     8 400     175  3.08  3.84  17.0     0     0     3     2

So really when it comes to using dplyr for data manipulation in Spark it sometimes requires some persistence in finding a function which will translate nicely to Spark SQL, especially if you don’t want to use the often slow spark_apply() function to apply an R function to a Spark object. Although I would recommend reading the sparklyr documentation, it can often be a little light on the details and so for a more detailed look at how to send R code to be executed on your Spark cluster, check out my colleague Jozef’s blog post.

As a side note, should you not be able to find an R function which will translate to SQL code, it is always worth checking out the list of Hive Operators and User-Defined Functions (UDFs). UDFs are functions that are built for specific purposes to perform operations like Mathematical, arithmetic, logical and relational on the operands of table column names.

Conclusion

This concludes this series of blog posts in which we have seen how we can select a single row from a data.frame, data.table or tibble for each group, where a column in that group is at the maximum value for its group. In this post, we saw how this task is quite easy to do with dplyr’s group_by() and slice() combination of functions. We then saw how we can translate our dplyr code to be executed as SQL code on a Spark cluster; though not all dplyr “verbs” currently translate into SQL. To that end, it is often worth looking to see if there is a Hive User-Defined Function to perform the data manipulation task at hand if there is not a direct translation of a dplyr function.