5 min read

Extending sparklyr: Data Types

TL;DR

sparklyr maps R data types and data storage types to Scala, but it doesn’t handle all data storage types. This blog post discusses how to generate Scala data storage types from the R side, that are not generated by sparklyr. You can do this by using the sparklyr::invoke_new function to generate the objects you want in Java or Scala, for example a java.util.ArrayList, and then sparklyr::invoke methods of the class to add data to it, or convert it to the type you need. Read on to see how to deal with different data storage types from the Scala side or skip ahead to see how to generate Scala data storage types from R.

Introduction

When working to extend the sparklyr package, for example to call custom Scala libraries, oftentimes you will come across Scala methods which require you to use different data storage types to those automatically handled by sparklyr. When using the invoke family of functions, R data types map to Scala data types, but sparklyr currently only handles certain R data storage type mappings. The below table shows the data mappings currently handled by sparklyr:

R Type Scala Type
logical Boolean
numeric Double
integer Integer
character String
list Array
Table 1: R to Scala type mappings available in sparklyr

So Scala functions with parameters that require a List or a Seq, for example, need to be handled in a different way. There are two ways we can approach this problem; from the Scala side and from the R side. We will explore both approaches.

Other Scala Data Types

Solutions from the Scala Side

There are several ways that this issue can be overcome from the Scala side, here we highlight three: changing the data type used within Scala; using overloading; and defining a specific R class to be called from R. These are discussed in detail below.

Using Different Data Types in Scala

One obvous way we could fix this problem is to rewrite the Scala code to use a different parameter type in the Scala method. For example, we could use an Array which would require us passing a list() on the R side. However, this is not ideal if your project is large, has lots of legacy code and uses other APIs such as PySpark; you may end up changing a lot of code.

Using Overloading

We can instead use overloading, which allows us to define methods of same name, in the same class, but having either different parameters or data types, though this has many issues. We would also need to write additional tests for the additional methods. You can think of this as working like R’s S3 methods - for S3 methods the method behaviour will change based on the object’s class.

Defining A New Scala Class

To avoid the possible issues of overloading, we can define two separate classes, myClass and myClassR. These will both call upon the same underlying implicit class which does the bulk of the work for the method. The difference is the data types that are passed into myClass and myClassR. myClass will take the data type you want to use, whereas myClassR will take the data type passed to it by sparklyr and then convert it before calling the implicit. Of course using this approach effectively doubles our code and is therefore very wasteful; we would also again, need to write additional tests for this new class.

Solutions from the R Side

Generating Scala Data Storage Types from R

We can actually forgo any changes on the Scala side of our code by generating what we need on the R side. Imagine we wanted to generate a Scala Seq as an example, first we create a Java ArrayList in the Spark environment and incrementally add data to it using the following code:

library(sparklyr)
sc <- spark_connect(master = "local")
# map some R vector `x` to a java ArrayList
x <- c(1, 2, 3)
al <- invoke_new(sc, "java.util.ArrayList")
lapply(x, FUN = function(y){invoke(al, "add", y)})

Note we don’t need to reassign the results of the lapply because it is adding values to the Scala List in the JVM. Then using the JavaConversions Scala package, we convert the Array to a Seq.

invoke_static(sc, "scala.collection.JavaConversions", "asScalaBuffer", al) %>%
  invoke("toSeq")

Putting this all together in a function gives

scala_seq <- function(sc, x) {
  al <- invoke_new(sc, "java.util.ArrayList")
  lapply(
    x,
    FUN = function(y) {
      invoke(al, "add", y)
    }
  )
  invoke_static(sc, "scala.collection.JavaConversions", "asScalaBuffer", al) %>%
    invoke("toSeq")
}

Calling this function returns a reference to the Scala object (spark_jobj).

# connect to spark
sc <- sparklyr::spark_connect(master = "local")

# create a scala seq object
scala_seq(sc, c(1, 2, 3))
# <jobj[16]>
#   scala.collection.convert.Wrappers$JListWrapper
#   Buffer(1.0, 2.0, 3.0)

You will note that the output we have received tells us we have created a Buffer but Buffer (and List) both belong to the same category (sequence). If what we needed was actually a List object, then we simply have to invoke the toList method on a Seq (or Buffer) object. The below function shows this in action, again this returns a reference to the Scala object (spark_jobj) to the R console.

scala_list <- function(sc, x) {
  scala_seq(sc, x) %>%
    invoke("toList")
}
# create a scala list
scala_list(sc, c(1, 2, 3))
# <jobj[21]>
#   scala.collection.immutable.$colon$colon
#   List(1.0, 2.0, 3.0)

# disconnect the spark connection
spark_disconnect(sc = sc)
# NULL

Similar results can be achieved for other data types. These new data storage types can now be used in Scala function calls when extending sparklyr, we simply generate the data in the JVM and pass the reference to the function we invoke.