Elizabeth
Engineer
Engineer
  • UID625
  • Fans4
  • Follows1
  • Posts68
Reads:3000Replies:0

[Share]Analysis on Spark 2.0 Structured Streaming

Created#
More Posted time:Nov 11, 2016 16:53 PM
Preface
Spark 2.0 incorporates stream computing into the DataFrame in a uniform way and proposes the concept of Structured Streaming. It maps data sources into an infinite-length table, and maps the stream computing results into another table at the same time. It manipulates the stream data in a fully structured way and reuses the Catalyst engine of its object.
Before emergence of Spark 2.0
As the stream implementation of the Spark platform, Spark Streaming has a separate set of abstraction and APIs which are roughly as follows:

Image source: Spark official website


The code is like below:
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
//Construct StreamingContext
val ssc = new StreamingContext(conf, Seconds(1))

//Obtain the input source
val lines = ssc.socketTextStream("localhost", 9999)

//Logic computing
val wordCounts = lines.flatMap(_.split(" ")).
map(word => (word, 1)).
reduceByKey(_ + _)

wordCounts.print()

//Start stream computing
ssc.start()        
ssc.awaitTermination()


The above are all routines and you need to follow them.
In the Spark 2.0 age
In concept, the so-called stream is nothing other than an infinitely large table. The official team provides a clear figure:
Image source: the official website

There is similar code in previous publicity PPT slides and the code is quite refreshing. Of course, you must have context for the code below. Just one line of it will not be run successfully.



The first is the standard code of DataFrame. The second below is the code for stream computing. You must be confused after watching this demo:
1. Isn’t there a timer? How should I set the duration?
2. Where can I set the awaitTermination?
3. If I want to write it to other engines, what if other engines do not have the adaption available?
These questions can be summarized to:
What is the full routine of Structured Streaming?
Let’s look at the code (the example is from the Spark source code and I made some edits):
val spark = SparkSession  .builder  .
master("local[2]")  .
appName("StructuredNetworkWordCount").
getOrCreate()


 val schemaExp = StructType(
      StructField("name", StringType, false) ::
        StructField("city", StringType, true)
        :: Nil
    )

//Standard DataSource API, only the read is changed to readStream.
   val words = spark.readStream.format("json").schema(schemaExp)
      .load("file:///tmp/dir")

   //Some APIs of DataFrame.
    val wordCounts = words.groupBy("name").count()

    //Standard DataSource writing API, only the write is changed to writeStream.
    val query = wordCounts.writeStream
//complete,append,update。Currently,
//only the first two types are supported.
      .outputMode("complete")
//The console, parquet, memory, and foreach types
      .format("console")
      .trigger(ProcessingTime(5.seconds))//Here is where the timer is set.
      .start()

    query.awaitTermination()


This is the complete routine of Structured Streaming.
Structured Streaming currently only supports File and Socket sources. It can output four types, as mentioned above. The foreach can be infinitely expanded. For example:
val query = wordCounts.writeStream.trigger(ProcessingTime(5.seconds))
      .outputMode("complete")
      .foreach(new ForeachWriter[Row] {

      var fileWriter: FileWriter = _

      override def process(value: Row): Unit = {
        fileWriter.append(value.toSeq.mkString(","))
      }

      override def close(errorOrNull: Throwable): Unit = {
        fileWriter.close()
      }

      override def open(partitionId: Long, version: Long): Boolean = {
        FileUtils.forceMkdir(new File(s"/tmp/example/${partitionId}"))
        fileWriter = new FileWriter(new File(s"/tmp/example/${partitionId}/temp"))
        true
      }
    }).start()


I wrote the data to the temporary directories of various nodes at last. Of course this is just an example. But other operations similar to writing data to Redis are all similar.
But API changes are not the full story of Structured Streaming.
If Structured Streaming only changes the API or can support DataFrame operations, I will feel sorry for it. Because before Spark 2.0, DataFrame operations also enjoyed good support through some encapsulations. So what is the significance of Structured Streaming?
• It re-abstracts the stream computing.
• It facilitates data exactly-once.
We know that Spark Streaming earlier than Spark 2.0 can achieve data at-least once and it was hard to achieve data exactly-once at the framework layer. For details, you can refer to my Spark Streaming Crash - How to Ensure Exactly Once Semantics. Now with the re-designed stream computing framework, the data exactly-once becomes easier to achieve.
You may notice that in Structured Streaming, there is an additional outputMode, namely there are now three types: complete, append, and update. The current version only implements the first two types.
1. The complete mode means you can get the full computing results after every computation.
2. The append mode means you can get the incremental computing results after every computation.
However, the complete mode is only available when the aggregate functions are used. The append mode is only available when the map or filter functions are used. I wonder whether everyone understands the meaning here?
The complete mode is the implementation of mapWithState we mentioned earlier. The append mode is the standard parsing and processing of data, without complicated aggregate statistics.
The official side provided a chart of the complete mode:

Image source: the official website
The append mode, on the contrary, returns the transformed latest data.
We have mentioned earlier that the design is every simple - it is nothing more than mapping an infinitely large Source Table to an infinitely large Result Table. After each cycle, the Result Table will be updated. We can see that the Structured Streaming has taken over the end-to-end and can ensure data integrity and reliability through internal mechanisms.
• The concept of offset is incorporated in stream computing.
• For data sources with no backtracking available, the WAL log is adopted.
• The concept of state means to encapsulate the state of every partition in the result table. Every ADD, PUT, UPDATE and DELETE operations to the partition will be written to HDFS to facilitate system recovery.
Among them, only the third concept is present in V2.0. However the pity is that the result table and ForeachWriter fail to have cooperation and the system only ensures the integrity of the result table and stores the result table to HDFS through HDFSBackedStateStoreProvider.
In the past, the API gave you an iterator of the partition for you to perform whatever configurations you like. But things change now, taking the ForeachWriter for example:
override def process(value: Row): Unit = {

You can only process the data record by record. In theory, if the system fails while the processing is in progress, data will be lost. But if the Structured Streaming adopts the complete mode, you actually only need to perform the overriding well as it is the full data, that is, idempotence.
If Structured Streaming adopts the append mode, data at-least once can be ensured. Internally, that is for the result table, the system can ensure the data exactly-once. For example, databases can support transactions and you can commit the transaction when the foreachWrite is closed. Should there be any failures, you can just roll back from the close point. But for other databases, such as HBase and Redis, things would be tougher.
In addition, for the initialization functions provided by ForeachWriter,
override def open(partitionId: Long, version: Long): Boolean = {

the returned value is Boolean. We can identify whether to skip the data processing of this partition by checking the version number. If it returns true, it indicates do not skip. Otherwise, skip. When you open it, you can save the version information in some way. At system recovery, you can read the version number. For a version lower than the current version, false will be returned; for a version consistent with the current one, the processing will continue.
Guest