All Products
Search
Document Center

E-MapReduce:Stream ingestion

Last Updated:Aug 05, 2024

In most cases, systems that support stream ingestion write streaming data in small batches to the storage system as small files and periodically merge the small files. For example, Hive and Delta Lake ingest streaming data in the preceding manner. Kudu also supports stream ingestion. However, Kudu has its own storage architecture and is not built on top of a big data storage system. This topic describes how to ingest streaming data from Kafka.

Prerequisites

  • A custom cluster or a DataLake cluster that contains the Delta Lake service is created in the E-MapReduce (EMR) console. For more information, see Create a cluster.

  • A Dataflow cluster that contains the Kafka service is created. For more information, see Create a cluster.

Limits

The DataLake cluster or custom cluster and the Dataflow cluster must be deployed in the same vSwitch of the same virtual private cloud (VPC).

Evolution of stream ingestion

Phase

Description

Previously

To ingest streaming data, you must manually partition a fact table by time in a fine-grained manner. For example, you can create a partition every 5 minutes. After data is written to a partition, you must execute the INSERT OVERWRITE statement to merge small files in the partition into one or more large files and write the large files back to the partition. However, the following issues may occur:

  • Reads and writes are not isolated, which easily causes read failures or data accuracy issues.

  • Streaming jobs cannot guarantee the exactly-once semantics. If a stream ingestion job fails, manual intervention is required to ensure that no data is duplicated or omitted. Spark Streaming can guarantee the at-least-once semantics but not the exactly-once semantics.

Hive 0.13 and later support transactions. Hive 2.0 and later provide the Hive Streaming feature to support stream ingestion. However, Hive Streaming is not widely used due to the following causes:

  • Hive modifies underlying files to implement the transaction feature. As a result, data in the common storage format can only be read by Hive. You cannot use other tools, such as Spark SQL or Presto, to read data in the storage format.

  • Hive transactions only support the Optimized Row Columnar (ORC) format.

  • Hive uses the merge-on-read mode. Hive needs to sort and merge small files during data reading. When the number of small files increases, the read performance sharply degrades. Therefore, you must manually merge small files at the earliest opportunity. However, small files often fail to be merged, which affects the business efficiency.

  • You can use Hive Streaming only in data warehouse scenarios. Hive Streaming is not suitable in data lake scenarios because data sources and data requirements in data lake scenarios are diverse.

Now

You can use Delta Lake to ingest streaming data in a convenient manner. Procedure:

  1. Create a table.

  2. Start a Spark Streaming job to write streaming data to the table.

  3. Periodically optimize data. For example, you can periodically optimize data after data is written to a partition.

  4. Periodically vacuum historical data. For example, you can vacuum data every day.

Example

In this example, data is read from Kafka and written to a Delta table.

  1. Log on to the Dataflow cluster in SSH mode. For more information, see Log on to a cluster.

  2. Run the following commands to create a Kafka topic:

    sudo su - kafka
    kafka-topics.sh --partitions 3 --replication-factor 2 --bootstrap-server core-1-1:9092 --topic delta_stream_sample --create
    Note

    core-1-1 indicates the internal IP address or hostname of the broker node in the Dataflow cluster.

  3. Prepare a Python script to continuously send data to Kafka.

    #! /usr/bin/env python3
    
    import json
    import time
    
    from kafka import KafkaProducer
    from kafka.errors import KafkaError
    
    bootstrap = ['core-1-1:9092']
    topic = 'delta_stream_sample'
    
    def gnerator():
        id = 0
        line = {}
        while True:
            line['id'] = id
            line['date'] = '2019-11-11'
            line['name'] = 'Robert'
            line['sales'] = 123
            yield line
            id = id + 1  
    
    def sendToKafka():
        producer = KafkaProducer(bootstrap_servers=bootstrap)
    
        for line in gnerator():
            data = json.dumps(line).encode('utf-8')
    
            # Asynchronous by default
            future = producer.send(topic, data)
    
            # Block for 'synchronous' sends
            try:
                record_metadata = future.get(timeout=10)
            except KafkaError as e:
                # Decide what to do if produce request failed
                pass
            time.sleep(0.1)
    
    sendToKafka()

    For convenience, all data records are the same except for the ID.

    {"id": 0, "date": "2019-11-11", "name": "Robert", "sales": 123}
    {"id": 1, "date": "2019-11-11", "name": "Robert", "sales": 123}
    {"id": 2, "date": "2019-11-11", "name": "Robert", "sales": 123}
    {"id": 3, "date": "2019-11-11", "name": "Robert", "sales": 123}
    {"id": 4, "date": "2019-11-11", "name": "Robert", "sales": 123}
    {"id": 5, "date": "2019-11-11", "name": "Robert", "sales": 123}
  4. Start a Spark Streaming job to read data from Kafka and write the data to a Delta table.

    1. Write Spark code.

      Sample code in Scala:

      import org.apache.spark.SparkConf
      import org.apache.spark.sql.{SparkSession, functions}
      import org.apache.spark.sql.types.{DataTypes, StructField}
      
      object StreamToDelta {
        def main(args: Array[String]): Unit = {
          val targetDir = "/tmp/delta_table"
          val checkpointLocation = "/tmp/delta_table_checkpoint"
          // 192.168.XX.XX is the internal IP address of Kafka.
          val bootstrapServers = "192.168.XX.XX:9092"
          val topic = "delta_stream_sample"
      
          val schema = DataTypes.createStructType(Array[StructField](
            DataTypes.createStructField("id", DataTypes.LongType, false),
            DataTypes.createStructField("date", DataTypes.DateType, false),
            DataTypes.createStructField("name", DataTypes.StringType, false),
            DataTypes.createStructField("sales", DataTypes.StringType, false)))
      
          val sparkConf = new SparkConf()
      
          // StreamToDelta is the class name of Scala.
          val spark = SparkSession
            .builder()
            .config(sparkConf)
            .appName("StreamToDelta")
            .getOrCreate()
      
          val lines = spark
            .readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", bootstrapServers)
            .option("subscribe", topic)
            .option("maxOffsetsPerTrigger", 1000)
            .option("startingOffsets", "earliest")
            .option("failOnDataLoss", value = false)
            .load()
            .select(functions.from_json(functions.col("value").cast("string"), schema).as("json"))
            .select("json.*")
      
          val query = lines.writeStream
            .outputMode("append")
            .format("delta")
            .option("checkpointLocation", checkpointLocation)
            .start(targetDir)
      
          query.awaitTermination()
        }
      }
    2. Package the code, and deploy the code to the DataLake cluster.

      1. After you debug the code on your on-premises machine, run the following command to package the code:

        mvn clean install
      2. Log on to the DataLake cluster in SSH mode. For more information, see Log on to a cluster.

      3. Upload the JAR package to the DataLake cluster.

        In this example, the JAR package is uploaded to the root directory of the DataLake cluster.

    3. Submit and run a Spark Streaming job.

      Run the spark-submit command to submit and run the Spark Streaming job:

      spark-submit \
       --master yarn \
       --deploy-mode cluster \
       --driver-memory 1g \
       --executor-cores 2 \
       --executor-memory 3g \
       --num-executors 1 \
       --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2 \
       --class com.aliyun.delta.StreamToDelta \
       delta-demo-1.0.jar
      Note

      In this example, a JAR package named delta-demo-1.0.jar is used. You can change the value of the --class parameter and the name of the JAR package based on your business requirements.

  5. Start the spark-shell client and check whether data is written to the Delta table.

    • Scala

      1. Run the following command to go to the spark-shell client:

        spark-shell --master local 
      2. Execute the following Scala statement to query data:

        val df = spark.read.format("delta").load("/tmp/delta_table")
        df.select("*").orderBy("id").show(10000)
    • SQL

      1. Run the following command to go to the streaming-sql client:

        streaming-sql --master local
      2. Execute the following SQL statement to query data:

        SELECT * FROM delta_table ORDER BY id LIMIT 10000;

        The result indicates that 2,285 data records are written to the Delta table.

        |2295|2019-11-11|Robert|  123|
        |2296|2019-11-11|Robert|  123|
        |2297|2019-11-11|Robert|  123|
        |2275|2019-11-11|Robert|  123|
        |2276|2019-11-11|Robert|  123|
        |2277|2019-11-11|Robert|  123|
        |2278|2019-11-11|Robert|  123|
        |2279|2019-11-11|Robert|  123|
        |2280|2019-11-11|Robert|  123|
        |2281|2019-11-11|Robert|  123|
        |2282|2019-11-11|Robert|  123|
        |2283|2019-11-11|Robert|  123|
        |2284|2019-11-11|Robert|  123|
        |2285|2019-11-11|Robert|  123|
        +----+----------+------+-----+

Test the exactly-once semantics

Stop the Spark Streaming job and restart the job. Read the Delta table. If data is read from the position where data reading stops previously, exactly-once semantics is achieved.

  • Scala

    df.select("*").orderBy("id").show(10000)
  • SQL

    SELECT * FROM delta_table ORDER BY id LIMIT 10000;
    |2878|2019-11-11|Robert|  123|
    |2879|2019-11-11|Robert|  123|
    |2880|2019-11-11|Robert|  123|
    |2881|2019-11-11|Robert|  123|
    |2882|2019-11-11|Robert|  123|
    |2883|2019-11-11|Robert|  123|
    |2884|2019-11-11|Robert|  123|
    |2885|2019-11-11|Robert|  123|
    |2886|2019-11-11|Robert|  123|
    |2887|2019-11-11|Robert|  123|
    |2888|2019-11-11|Robert|  123|
    |2889|2019-11-11|Robert|  123|
    |2890|2019-11-11|Robert|  123|
    |2891|2019-11-11|Robert|  123|