Introduction
In my last post we looked at how to slice a data.table
by group to obtain the rows for which a particular column in that group is at its maximum value using the excellent data.table
package. In this post, we will be taking a look at how to perform this task using dplyr
and sparklyr
.
dplyr
First, let’s take a look at our data.
library(dplyr)
mtcars <- mtcars %>%
tibble::rownames_to_column(var = "car") %>%
tibble::as_tibble()
mtcars
# # A tibble: 32 x 12
# car mpg cyl disp hp drat wt qsec vs am gear carb
# <chr> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl>
# 1 Mazda RX4 21 6 160 110 3.9 2.62 16.5 0 1 4 4
# 2 Mazda RX4 … 21 6 160 110 3.9 2.88 17.0 0 1 4 4
# 3 Datsun 710 22.8 4 108 93 3.85 2.32 18.6 1 1 4 1
# 4 Hornet 4 D… 21.4 6 258 110 3.08 3.22 19.4 1 0 3 1
# 5 Hornet Spo… 18.7 8 360 175 3.15 3.44 17.0 0 0 3 2
# 6 Valiant 18.1 6 225 105 2.76 3.46 20.2 1 0 3 1
# 7 Duster 360 14.3 8 360 245 3.21 3.57 15.8 0 0 3 4
# 8 Merc 240D 24.4 4 147. 62 3.69 3.19 20 1 0 4 2
# 9 Merc 230 22.8 4 141. 95 3.92 3.15 22.9 1 0 4 2
# 10 Merc 280 19.2 6 168. 123 3.92 3.44 18.3 1 0 4 4
# # … with 22 more rows
So here, we are interested in getting a single car from each cyl
group whose mpg
is at the maximum for that group. I really like the dplyr
syntax for this problem, it’s really straight forward; take a look below:
mtcars %>%
group_by(cyl) %>%
arrange(desc(mpg)) %>%
slice(1) %>%
ungroup()
# # A tibble: 3 x 12
# car mpg cyl disp hp drat wt qsec vs am gear carb
# <chr> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl>
# 1 Toyota Coro… 33.9 4 71.1 65 4.22 1.84 19.9 1 1 4 1
# 2 Hornet 4 Dr… 21.4 6 258 110 3.08 3.22 19.4 1 0 3 1
# 3 Pontiac Fir… 19.2 8 400 175 3.08 3.84 17.0 0 0 3 2
We tell dplyr
to create groups of data for each of the cyl
levels and then within each group we arrange()
by mpg
in descending order. Once we have our data organised in this way it’s as simple as taking the top row from each group using slice()
. Of course there is more than one way we can achieve this task using dplyr
, take this next example for instance.
mtcars %>%
group_by(cyl) %>%
arrange(desc(mpg)) %>%
mutate(row_number = row_number()) %>%
filter(row_number == 1) %>%
select(-row_number) %>%
ungroup()
# # A tibble: 3 x 12
# car mpg cyl disp hp drat wt qsec vs am gear carb
# <chr> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl>
# 1 Toyota Coro… 33.9 4 71.1 65 4.22 1.84 19.9 1 1 4 1
# 2 Hornet 4 Dr… 21.4 6 258 110 3.08 3.22 19.4 1 0 3 1
# 3 Pontiac Fir… 19.2 8 400 175 3.08 3.84 17.0 0 0 3 2
It might not seem to be the most logical approach to this problem given we have access to the slice()
function but it feeds nicely into the sparklyr
section.
If you are interested, below you can see the benchmarks for these two dplyr
approaches. We can see that the slice()
approach is much quicker than the mutate()
approach which we would expect since there is much less manipulation of the data going on in the first approach.
sparklyr
A great feature of dplyr
is its ability to execute your R code on a Spark cluster. To achieve this, dplyr
uses the dbplyr
package which translates your dplyr
code into Spark SQL code which can then be passed to the Spark connection to be executed by your Spark cluster. The problem, however, is that not all dplyr
verbs translate.
First, let’s set up a local Spark cluster and upload the mtcars
data to it.
library(sparklyr)
sc <- spark_connect(master = "local")
mtcars_spark <- copy_to(sc, mtcars, "mtcars")
Now we can explore our first dplyr
example by attempting to execute it on the Spark cluster. We use dbplyr::sql_render()
as the final part of the chain to try and see the SQL code that dbplyr
translates the dplyr
code to.
mtcars_spark %>%
group_by(cyl) %>%
arrange(desc(mpg)) %>%
slice(1) %>%
ungroup() %>%
dbplyr::sql_render()
# Error in slice_.tbl_spark(.data, .dots = compat_as_lazy_dots(...)): Slice is not supported in this version of sparklyr
In this instance, dplyr
tells us that we cannot use slice()
since it is not currently supported by sparklyr
, this is because there is no direct translation from slice()
to Spark SQL code. So let’s try our second approach.
mtcars_spark %>%
group_by(cyl) %>%
arrange(desc(mpg)) %>%
mutate(row_number = row_number()) %>%
filter(row_number == 1) %>%
select(-row_number) %>%
ungroup() %>%
dbplyr::sql_render()
# <SQL> SELECT `car`, `mpg`, `cyl`, `disp`, `hp`, `drat`, `wt`, `qsec`, `vs`, `am`, `gear`, `carb`
# FROM (SELECT `car`, `mpg`, `cyl`, `disp`, `hp`, `drat`, `wt`, `qsec`, `vs`, `am`, `gear`, `carb`, ROW_NUMBER() OVER (PARTITION BY `cyl` ORDER BY `mpg` DESC) AS `row_number`
# FROM (SELECT *
# FROM `mtcars`
# ORDER BY `mpg` DESC) `dbplyr_001`) `dbplyr_002`
# WHERE (`row_number` = 1.0)
Here we see that the function row_number()
does translate; since it is a ranking function which mimics the functions described in SQL2003 (see ?ranking
), dbplyr
knows the equivalent SQL code.
mtcars_spark %>%
group_by(cyl) %>%
arrange(desc(mpg)) %>%
mutate(row_number = row_number()) %>%
filter(row_number == 1) %>%
select(-row_number) %>%
ungroup() %>%
collect()
# # A tibble: 3 x 12
# car mpg cyl disp hp drat wt qsec vs am gear carb
# <chr> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl>
# 1 Hornet 4 Dr… 21.4 6 258 110 3.08 3.22 19.4 1 0 3 1
# 2 Toyota Coro… 33.9 4 71.1 65 4.22 1.84 19.9 1 1 4 1
# 3 Pontiac Fir… 19.2 8 400 175 3.08 3.84 17.0 0 0 3 2
So really when it comes to using dplyr
for data manipulation in Spark it sometimes requires some persistence in finding a function which will translate nicely to Spark SQL, especially if you don’t want to use the often slow spark_apply()
function to apply an R function to a Spark object. Although I would recommend reading the sparklyr
documentation, it can often be a little light on the details and so for a more detailed look at how to send R code to be executed on your Spark cluster, check out my colleague Jozef’s blog post.
As a side note, should you not be able to find an R function which will translate to SQL code, it is always worth checking out the list of Hive Operators and User-Defined Functions (UDFs). UDFs are functions that are built for specific purposes to perform operations like Mathematical, arithmetic, logical and relational on the operands of table column names.
Conclusion
This concludes this series of blog posts in which we have seen how we can select a single row from a data.frame
, data.table
or tibble
for each group, where a column in that group is at the maximum value for its group. In this post, we saw how this task is quite easy to do with dplyr
’s group_by()
and slice()
combination of functions. We then saw how we can translate our dplyr
code to be executed as SQL code on a Spark cluster; though not all dplyr
“verbs” currently translate into SQL. To that end, it is often worth looking to see if there is a Hive User-Defined Function to perform the data manipulation task at hand if there is not a direct translation of a dplyr
function.