All Products
Search
Document Center

E-MapReduce:Basic usage

Last Updated:Mar 26, 2026

Delta Lake is pre-integrated in E-MapReduce (EMR), giving your Apache Spark workloads ACID transactions, scalable metadata handling, and unified batch and streaming data processing on your existing data lake storage.

Configuration information

Delta Lake is enabled in EMR through Spark session extensions. The required configuration differs by Spark version.

Spark 2.X

spark.sql.extensions  io.delta.sql.DeltaSparkSessionExtension

Spark 3.X

spark.sql.extensions        io.delta.sql.DeltaSparkSessionExtension
spark.sql.catalog.spark_catalog  org.apache.spark.sql.delta.catalog.DeltaCatalog

These settings are applied by default when you add the Delta Lake service to your EMR cluster. To apply them manually, pass them as --conf flags when starting streaming-sql.

Common operations

All examples in this section use a Delta table named delta_table.

Create a table

CREATE TABLE delta_table (id INT) USING delta;

Insert data

INSERT INTO delta_table VALUES 0,1,2,3,4;

Verify the insert:

SELECT * FROM delta_table;

Overwrite data

INSERT OVERWRITE TABLE delta_table VALUES 5,6,7,8,9;

Verify the result:

SELECT * FROM delta_table;

Update data

Add 100 to all even IDs:

UPDATE delta_table SET id = id + 100 WHERE mod(id, 2) = 0;

Verify the update:

SELECT * FROM delta_table;

Delete data

Delete all records with even IDs:

DELETE FROM delta_table WHERE mod(id, 2) = 0;

Verify the deletion:

SELECT * FROM delta_table;

Merge data

The MERGE INTO statement upserts data from a source table into a Delta table. Use it to apply conditional updates and inserts in a single atomic operation.

  1. Create a source table for the merge operation.

    CREATE TABLE newData(id INT) USING delta;
  2. Insert data into the source table.

    INSERT INTO newData VALUES 0,1,2,3,4,5,6,7,8,9;
  3. Merge data from newData into delta_table. If the ID of a record in newData matches an ID in delta_table, add 100 to the matched ID. Otherwise, insert the record directly.

    MERGE INTO delta_table AS target
      USING newData AS source
      ON target.id = source.id
      WHEN MATCHED THEN UPDATE SET target.id = source.id + 100
      WHEN NOT MATCHED THEN INSERT *;
  4. Verify the merge result.

    SELECT * FROM delta_table;

Read a Delta table in streaming mode

These steps use the EMR streaming-sql CLI to set up a streaming pipeline from an existing Delta table.

Step 6 requires a second streaming-sql client session running at the same time. Open a second terminal before starting step 6.

Prerequisites

Before you begin, ensure that you have:

  • An EMR cluster with SSH access

  • An existing Delta table named delta_table

Set up the streaming pipeline

  1. Log on to your cluster in SSH mode.

  2. Start the streaming-sql client.

    streaming-sql
    If Delta Lake is not added as a service to your cluster, start streaming-sql with the Delta Lake JAR and configuration instead:
    streaming-sql --jars /path/to/delta-core_2.11-0.6.1.jar --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension
  3. Create the destination table.

    CREATE TABLE stream_debug_table (id INT) USING DELTA;
  4. Create a stream scan on the source Delta table.

    CREATE SCAN stream_delta_table on delta_table USING STREAM;
  5. Start the streaming job to write data from delta_table to stream_debug_table.

    CREATE STREAM job
    options (
      triggerType='ProcessingTime',
      checkpointLocation = '/tmp/streaming_read_cp'
    )
    INSERT INTO stream_debug_table
    SELECT * FROM stream_delta_table;

Verify streaming behavior

  1. In a second streaming-sql client session, query the current state of the destination table.

    SELECT * FROM stream_debug_table;
  2. In the second session, insert data into the source table.

    INSERT INTO delta_table VALUES 801, 802;
  3. Query the destination table again to confirm the new data was streamed through.

    SELECT * FROM stream_debug_table;
  4. Insert another batch of data into the source table.

    INSERT INTO delta_table VALUES 901, 902;
  5. Query the destination table to confirm the second batch was streamed through.

    SELECT * FROM stream_debug_table;