• UID619
  • Fans3
  • Follows2
  • Posts59

Implement REST DataSource using Spark DataSource API

More Posted time:Oct 19, 2016 13:27 PM
Abstract:  The proposal of Spark DataSource API enables adaptability of various data sources to specifications, so that Spark's computing capability can be leveraged efficiently.  Typical implementation includes Parquet, CarbonData, and Postgrep (JDBC class).  This article introduces how to read standard REST interfaces using Spark DataSource.
First, let me explain the source of this requirement.  Usually in a main flow of stream calculation, a lot of mapping data will be used such as the comparison relationship. Such mapping data is usually exposed through the HTTP interface. The case is especially true for external systems for which you basically have no way to directly read the database through JDBC.
The above is one of the reasons. The second reason is that I need to implement flattening processing for the JSON data read from HTTP interfaces.  Currently, SQL processing of JSON data can solve simple nestification issues, but it is not efficient for more complicated approaches.
Take the format below for example:


It is better to expand the code above to the following format to make it directly usable for the main flow:

To meet the requirements of my colleagues, I need to, first, implement standard DataSource API to GET REST interfaces, and second, provide a module capable of the above merging rules and allow configurations.
Implementation objective
First, let's see what DataSource API looks like:
val df = SQLContext.getOrCreate(sc).
format(”driver class”).//The driver program, similar to the JDBC driver class.
options(Map(....)). //The additional parameters you need to pass to the driver.
load(”url”)//The resource path.

To make it configurable, it will be:
        "name": "streaming.core.compositor.spark.source.SQLSourceCompositor",
        "params": [
            "format": "",
            "url": "http://[your dns]/path",
            "xPath": "$.data"

DefaultSource implementation
extends RelationProvider
with DataSourceRegister

This is a typical naming rule.  The rest means it supports REST interfaces, json means the REST interface data is of the JSON format and the package name is quite clear to understand.
First, let's look at the two interfaces that DefaultSource inherits.
• DataSourceRegister
This interface has only one shortName method.  We can see the package name above is long. You can give it a shorter name:

So the specific implementation is changed to:
override def shortName(): String = "restJSON"

This interface also has only one method:
def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation

Its returned value BaseRelation object describes the interaction between the DataSource and Spark SQL.  The createRelation method allows you to create an appropriate BaseRelation implementation class according to the user-defined parameters.
As a matter of fact, there are some other classes carrying more information inherited from RelationProvider, such as:

SchemaRelationProvider allows you to directly pass the schema information to BaseRelation implementation.
HadoopFsRelationProvider not only adds the path to parameters, but also makes conventions for the returned values to be HadoopFsRelation. HadoopFsRelation provides a majority of implementation for interaction with HDFS.
In our implementation, we only need to implement the basic RelationProvider.
Let's take a look at the specific code of DefaultSource.createRelation:
override def createRelation(
                               sqlContext: SQLContext,
                               //Do you still remember the options method of DataSource? The parameters are
                               //Passed by the user through the options method.
                               parameters: Map[String, String]
                               ): BaseRelation = {
//Because we don't need the user to provide the schema.
//We can deduce it from the JSON-format data.
// Here the concept of sampling rate is involved.
    val samplingRatio = parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0)
// Do you still remember the DataSource path?  In theory, it is passed through the DataSource path. However,
//here it is passed through the options method.
    val url = parameters.getOrElse("url", "")
// We need to extract the data we need through the XPATH syntax, for example:
//In the previous example, we need the array to extract data.
    val xPath = parameters.getOrElse("xPath", "$")
   //Here is the core.
    new RestJSONRelation(None, url, xPath, samplingRatio, None)(sqlContext)

The instructions can be found in the source code.  Here, the RestJSONRelation is the core that implements the interaction between Spark SQL and DataSource.  RestJSONRelation inherits from BaseRelation and TableScan among other base classes.
RestJSONRelation Let's look at the signature of RestJSONRelation:
private[sql] class RestJSONRelation(
                           val inputRDD: Option[RDD[String]],
                           val url: String,
                           val xPath: String,
                           val samplingRatio: Double,
                           val maybeDataSchema: Option[StructType]
                           )(@transient val sqlContext: SQLContext)
  extends BaseRelation with TableScan {

You can define these parameters at will.  Of course, the specific meanings of url, xPath, and samplingRatio have been introduced in the previous section.
Two information items are necessary for interaction with DataSource:
1. Schema.  There are only two ways to get the schema information: the user tells you the information, or the program deduces the scheme based on the data. BaseRelation provides several basic conventions for the schema information:
• needConversion: whether type conversion is required. Because Spark SQL adopts data rows internally, the data in a row should be of a specific type. For example, String will be converted to UTF8String. The option is true by default and the official suggestion is also to leave the option as is.
• unhandledFilters: returns some filters that cannot be pushed down by the DataSource. As a result, the parser will know filter is required in Spark. Otherwise, Spark will assume you have performed the filtering and the data calculation result will go wrong.
2. Data scanning method.  At present, Spark SQL provides four types of data scans.
• TableScan: full-table scans
• PrunedScan: you can specify a column, and data sources of other columns will not be returned.
• PrunedFilteredScan: you can specify a column, and add some filtering conditions. Only the data satisfying the conditions will be returned. This is also the so-called pushdown operation of DataSource.
• CatalystScan is similar to PrunedFilteredScan. It supports column filtering and data filtering. But the accepted filtering conditions are the expressions in Spark.  It is more flexible theoretically. But in Spark source code (V1.6.1), I didn't see the specific implementation case of this class. Here we only need to implement a simple TableScan. Because our data is dictionary data and no filtering is required.
Schema deduction
BaseRelation needs you to provide the schema. Here, we will first define a lazy attribute of dataSchema to prevent the schema method from being called and deducted repeatedly.
override def schema: StructType = dataSchema
lazy val dataSchema = .....

Because we need to deduct the schema according to data, we should first obtain the data. Let's define a method:
private def createBaseRdd(inputPaths: Array[String]): RDD[String]

inputPaths: I carry on the concept of the file system. In fact, it is actually a URL here. We know that all the final direct data source of Spark SQL is of the RDD type. So here we also return the RDD[String] type. The specific implementation is very easy - you only need to get the data through HttpClient according to the inputPaths, and then makeRDD.
//It would be better to add a retry mechanism.
private def createBaseRdd(inputPaths: Array[String]): RDD[String] = {
    val url = inputPaths.head
    val res = Request.Get(new URL(url).toURI).execute()
    val response = res.returnResponse()
    val content = EntityUtils.toString(response.getEntity)
    if (response != null && response.getStatusLine.getStatusCode == 200) {
      //Here it is used for data extraction to extract the data array.
      import scala.collection.JavaConversions._
      val extractContent = JSONArray.fromObject(, xPath)).
        map(f => JSONObject.fromObject(f).toString).toSeq
    } else {

With this class, we can obtain data for schema deduction:
lazy val dataSchema = {
   //We also allow users to pass the schema to us. Otherwise we can deduce the scheme on our own.
    val jsonSchema = maybeDataSchema.getOrElse {      
        //Get the data
       //Sampling rate, that is, the sc.sample method.


The implementation logic of InferSchema is comparatively complicated, but the ultimate goal is to return StructType(fields:  Array[StructField]). I copied the implementation of Spark JSON DataSource directly. If you are interested, you can refer to it yourself. StructType is very easy too. It is nothing more than a structure describing the schema. For example, if you want to define a table, you need to define a list of information for the system including the field name, type, and whether the value is null.
By now we have finally completed the data table structure.
Data access
Just now we mentioned four approaches of getting data. Here we use TableScan. Only one buildScan method is required to be implemented to inherit from this interface:
def buildScan(): RDD[Row] = {
      dataSchema,      sqlContext.conf.columnNameOfCorruptRecord).asInstanceOf[RDD[Row]]

The essence is to convert the JSON string to the Row format according to the schema we have.
The specific practices are as follows:
//This is the RDD[String] returned by createBaseRDD.
//The corresponding string is of the JSON format.
//Process each partition.
json.mapPartitions { iter =>
      val factory = new JsonFactory()
      iter.flatMap { record =>
        try {
          //JSON parser.
          val parser = factory.createParser(record)
         //Type conversion starting from here.
          convertField(factory, parser, schema) match {
            case null => failedRecord(record)
            case row: InternalRow => row :: Nil
            case array: ArrayData =>
              if (array.numElements() == 0) {
              } else {
            case _ =>
                s"Failed to parse record $record. Please make sure that each line of the file " +
                  "(or each string in the RDD) is a valid JSON object or an array of JSON objects.")
        } catch {
          case _: JsonProcessingException =>

Code here is still clear and easy to understand. However, it is confusing to match convertField(factory, parser, schema) with InternalRow. How can a field be changed to an InternalRow after conversion? It is subtle here. Let's look at the convertField method:
private[sql] def convertField(
      factory: JsonFactory,
      parser: JsonParser,
      schema: DataType): Any = {
    import com.fasterxml.jackson.core.JsonToken._
    (parser.getCurrentToken, schema) match {
      case (null | VALUE_NULL, _) =>

      case (FIELD_NAME, _) =>
        convertField(factory, parser, schema)

     case (START_OBJECT, st: StructType) =>  
       convertObject(factory, parser, st)

If your JSON is a map, after N times of case mapping, it will come to the last case. The st:StructType is the dataSchema we deduced earlier. The convertObject method is as follows:
while (nextUntil(parser, JsonToken.END_OBJECT)) {
      schema.getFieldIndex(parser.getCurrentName) match {
        case Some(index) =>
          row.update(index, convertField(factory, parser, schema(index).dataType))

        case None =>

By now everything is clear. To get complete data, it will perform a while loop until the END_OBJECT is met. The so-called END_OBJECT is actually an ending map.  In every loop, a field is retrieved, and the type information is fetched by the field name in the schema. Then the convertField method is called back to convert the field to a format required by the row, for example, character strings are converted to the UTF8String format.
case (VALUE_STRING, StringType) =>  UTF8String.fromString(parser.getText)
The returned value is updated through the Row functions. Here we use the row.update method. After the END_OBJECT is complete, the function of converting a JSON map to a Row format is complete.
Finishing up
By now, we have wrapped up the specific tasks. Now you should be able to use the following method:
val df = SQLContext.getOrCreate(sc).
format(””).//The driver program, similar to the driver class of JDBC.
"url"->"http://[your dns]/path"
"xPath" -> "$.data"
)). //The parameters you need to pass to the driver additionally.
load(”url”)//The resource path.
The obtained Dataframe. You can perform desired operations on it.

The proposal of Spark DataSource API brings a huge benefit to the Spark building ecology. Various storage systems can implement uniform standard interfaces to connect to Spark. Knowing how to implement a data source helps your storage systems to better integrate with the ecology for better performance optimization.