Dave
Assistant Engineer
Assistant Engineer
  • UID627
  • Fans3
  • Follows0
  • Posts55
Reads:2481Replies:0

[Others]Implement configuration-based ETL for Spark Streaming + Spark SQL

Created#
More Posted time:Oct 17, 2016 14:34 PM
Abstract:  Spark Streaming is very suitable for ETL. But its development is not highly modularized. So here we provide a solution to offer a new API for developing Spark Streaming programs, and implementing modularized and configuration-based ETL that supports SQL-based data processing.
Project address
Preface
Traditional Spark Streaming programs require:
• Constructing StreamingContext
• Setting the checkpoint
• Linking the data source
• Various transform operations
• The foreachRDD output
In general, you may have to construct a huge program to go through all the procedures above, such as hundreds of lines of code in a main method. Although it is convenient enough for developing tiny features, it is not doing a good job in module reuse. Besides, it is not convenient for collaboration. So we need the support from a higher-level development kit.
How to develop a Spark Streaming program
I only need to add a job configuration item as below in the configuration file, and the program can be submitted to be run as a standard Spark Streaming program:
{

  "test": {
    "desc": "test",
    "strategy": "streaming.core.strategy.SparkStreamingStrategy",
    "algorithm": [],
    "ref": [],
    "compositor": [
      {
        "name": "streaming.core.compositor.kafka.MockKafkaStreamingCompositor",
        "params": [
          {
            "metadata.broker.list":"xxx",
            "auto.offset.reset":"largest",
            "topics":"xxx"
          }
        ]
      },
      {
        "name": "streaming.core.compositor.spark.JSONTableCompositor",
        "params": [{"tableName":"test"}
        ]
      },
      {
        "name": "streaming.core.compositor.spark.SQLCompositor",
        "params": [{"sql":"select a from test"}
        ]
      },
      {
        "name": "streaming.core.compositor.RDDPrintOutputCompositor",
        "params": [
          {
          }
        ]
      }
    ],
    "configParams": {
    }
  }
}


The above configuration is actually equal to completing the following procedures:
1. Consuming data from Kafka
2. Transforming the Kafka data into a table
3. Processing the data through SQL statements
4. Printing and outputting the data
Isn’t that easy? It also supports hot loading, dynamic job addition and so on.
Features
The implementation has the following features:
1. Configuration-based
2. Supports configuration of multiple jobs
3. Supports various data source modules
4. Supports data processing through SQL statements
5. Supports multiple types of output modules
The extensible supports in future include:
1. Dynamic addition or deletion of job updates without restarting Spark Streaming
2. Supports Storm and other streaming engines
3. Better multi-job interactions
Configuration format descriptions
The implementation is fully based on ServiceframeworkDispatcher and the core feature only takes around three hours.
First, we need to summarize a few concepts:
1. Spark Streaming is defined as an App
2. Every action is defined as a job and an App can contain multiple jobs
The configuration file structure is designed as follows:
{

  "job1": {
    "desc": "test",
    "strategy": "streaming.core.strategy.SparkStreamingStrategy",
    "algorithm": [],
    "ref": [],
    "compositor": [
      {
        "name": "streaming.core.compositor.kafka.MockKafkaStreamingCompositor",
        "params": [
          {
            "metadata.broker.list":"xxx",
            "auto.offset.reset":"largest",
            "topics":"xxx"
          }
        ]
      } ,  
    ],
    "configParams": {
    }
  },
  "job2":{
   ........
 }
}


A complete App corresponds to a configuration file. Every top-layer configuration option, such as job1 and job2, corresponds to a workflow respectively. They will eventually run on an App (Spark Streaming instance).
• The strategy is used to define the call relationships between the compositor, algorithm and ref.
• The algorithm serves as the data source
• The compositor serves as the data processing link module. In most cases, we conduct development for this interface.
• The ref is the reference of other jobs. Through appropriate strategies, we can organize multiple jobs into a new job.
• Every component (compositor, algorithm, strategy) supports parameter configuration.
The above mainly analyzes the forms of the configuration file, and ServiceframeworkDispatcher already provides a set of interface rules for you to follow for implementation.
Implement modules
So how are the corresponding modules implemented? In essence, it is to transform the above configuration files, through the implemented modules, to a Spark Streaming program.
Taking the specific implementation of SQLCompositor for example:
class SQLCompositor[T] extends Compositor[T] {

  private var _configParams: util.List[util.Map[Any, Any]] = _
  val logger = Logger.getLogger(classOf[SQLCompositor[T]].getName)

//The strategy engine ServiceFrameStrategy will call the method to pass the configuration in.
  override def initialize(typeFilters: util.List[String], configParams: util.List[util.Map[Any, Any]]): Unit = {
    this._configParams = configParams
  }

// Obtain the configuration SQL statements
  def sql = {
    _configParams(0).get("sql").toString
  }

  def outputTable = {
    _configParams(0).get("outputTable").toString
  }

//The primary method of execution. In general, it is to obtain SQLContext (with the corresponding table registered) from the previous module,
//Then, according to the configuration of the module, set the query statement and get a new dataFrame.
// The T in middleResult actually refers to DStream which we will pass to the next module, the Output module.
//The params parameter facilitates information sharing between various modules. Here we pass the processed functions to the next module.
  override def result(alg: util.List[Processor[T]], ref: util.List[Strategy[T]], middleResult: util.List[T], params: util.Map[Any, Any]): util.List[T] = {
    var dataFrame: DataFrame = null
    val func = params.get("table").asInstanceOf[(RDD[String]) => SQLContext]
    params.put("sql",(rdd:RDD[String])=>{
      val sqlContext = func(rdd)
      dataFrame = sqlContext.sql(sql)
      dataFrame
    })
    middleResult
  }
}


The code above completes an SQL module. So how can be complete a self-defined .map function? We can do it by referring to the following implementation:
abstract class MapCompositor[T,U] extends Compositor[T]{
  private var _configParams: util.List[util.Map[Any, Any]] = _
  val logger = Logger.getLogger(classOf[SQLCompositor[T]].getName)

  override def initialize(typeFilters: util.List[String], configParams: util.List[util.Map[Any, Any]]): Unit = {
    this._configParams = configParams
  }

  override def result(alg: util.List[Processor[T]], ref: util.List[Strategy[T]], middleResult: util.List[T], params: util.Map[Any, Any]): util.List[T] = {
    val dstream = middleResult(0).asInstanceOf[DStream[String]]
    val newDstream = dstream.map(f=>parseLog(f))
    List(newDstream.asInstanceOf[T])
  }
  def parseLog(line:String): U
}

class YourCompositor[T,U] extends MapCompositor[T,U]{

 override def parseLog(line:String):U={
     ....your logical
  }
}


Similarly, you can implement filter, repartition and other functions.
Summary
This method provides a set of higher-level API abstraction and you only need to care about the specific implementation without caring about the usage of Spark. At the same time, we also provide a configuration-based system to facilitate the construction of data processing procedures and reusing original modules, and the system also supports SQL-based data processing.
Guest