This topic describes the configuration information of Delta Lake in E-MapReduce (EMR) and provides some examples of common commands.

Configuration information

Default configuration of Delta Lake in EMR:
  • Spark 2.X
    spark.sql.extensions io.delta.sql.DeltaSparkSessionExtension
  • Spark 3.X
    spark.sql.extensions io.delta.sql.DeltaSparkSessionExtension
    spark.sql.catalog.spark_catalog org.apache.spark.sql.delta.catalog.DeltaCatalog

Common commands

  • Create a table
    CREATE TABLE delta_table (id INT) USING delta;
  • Insert data into a table
    INSERT INTO delta_table VALUES 0,1,2,3,4;
  • Overwrites the existing data in a table
    INSERT OVERWRITE TABLE delta_table VALUES 5,6,7,8,9;
  • Query data
    SELECT * FROM delta_table;
  • Update data
    UPDATE delta_table SET id = id + 100 WHERE mod(id, 2) = 0;--Add 100 to all IDs that are even numbers. 
  • Delete data
    DELETE FROM delta_table WHERE mod(id, 2) = 0;--Delete the records whose IDs are even numbers. 
  • Merge data
    1. Create a source table named newData for the merge operation.
      CREATE TABLE newData(id INT) USING delta;
    2. Insert data into the newData table.
      INSERT INTO newData VALUES 0,1,2,3,4,5,6,7,8,9;
    3. Merge data in the newData table into the delta_table table. If the ID of a record in the newData table is the same as the ID of a record in the delta_table table, 100 is added to the ID of the record in the source table. Otherwise, the record is directly inserted.
      MERGE INTO delta_table AS target
        USING newData AS source
        ON target.id = source.id
        WHEN MATCHED THEN UPDATE SET target.id = source.id + 100
        WHEN NOT MATCHED THEN INSERT *;
  • Read data from a Delta table in streaming mode
    1. Create a destination table into which you want to write the data that is read from the Delta table.
      CREATE TABLE stream_debug_table (id INT);
    2. Create a stream.
      CREATE SCAN stream_delta_table on delta_table USING STREAM;
      Note In this example, delta_table is an existing Delta table.
    3. Write data to the destination table.
      CREATE STREAM job
      options (
        triggerType='ProcessingTime',
        checkpointLocation = '/tmp/streaming_read_cp'
      )
      INSERT INTO stream_debug_table
      SELECT *
      FROM stream_delta_table;
  • Write data to a Delta table in streaming mode
    1. Create a Kafka pipeline table.
      CREATE TABLE IF NOT EXISTS kafka_topic
      USING kafka
      OPTIONS (
      kafka.bootstrap.servers = "${BOOTSTRAP_SERVERS}",
      subscribe = "${TOPIC_NAME}"
      );
      Note In the preceding code, kafka.bootstrap.servers specifies the internal IP address and port number of a Kafka broker in the Kafka cluster. subscribe specifies the name of a topic.
    2. Create a stream.
      CREATE SCAN stream_kafka_topic on kafka_topic USING STREAM;
    3. Write data to the Delta table in streaming mode.
      CREATE STREAM job
      options (
        triggerType='ProcessingTime',
        checkpointLocation = '/tmp/streaming_read_cp'
      )
      INSERT INTO stream_debug_table
      SELECT *
      FROM stream_delta_table;

Example

  1. Log on to your cluster in SSH mode. For more information, see Log on to a cluster.
  2. Run the following command to start streaming-sql:
    streaming-sql
    Note If the Delta Lake service is added to your cluster, you can directly run the streaming-sql command. If the service is not added to your cluster, you can run the following command to use Delta Lake:
    streaming-sql --jars /path/to/delta-core_2.11-0.6.1.jar --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension
  3. Run the following command to create a destination table:
    CREATE TABLE stream_debug_table (id INT) USING DELTA;
  4. Run the following command to create a stream:
    CREATE SCAN stream_delta_table on delta_table USING STREAM;
  5. Run the following command to write data in the source table named delta_table to the destination table in streaming mode:
    CREATE STREAM job
    options (
      triggerType='ProcessingTime',
      checkpointLocation = '/tmp/streaming_read_cp'
    )
    INSERT INTO stream_debug_table
    SELECT *
    FROM stream_delta_table;
  6. Start another streaming-sql client and perform the following operations to insert data into the source table and query data in the destination table:
    1. Run the following command to query existing data in the destination table:
      SELECT * FROM stream_debug_table;
    2. Run the following command to insert data into the source table:
      INSERT INTO delta_table VALUES 801, 802;
    3. Run the following command to check whether the data that is inserted into the source table is written to the destination table in streaming mode:
      SELECT * FROM stream_debug_table;
    4. Run the following command to insert data into the source table:
      INSERT INTO delta_table VALUES 901, 902;
    5. Run the following command to check whether the data that is inserted into the source table is written to the destination table in streaming mode:
      SELECT * FROM stream_debug_table;