Delta Lake is pre-integrated in E-MapReduce (EMR), giving your Apache Spark workloads ACID transactions, scalable metadata handling, and unified batch and streaming data processing on your existing data lake storage.
Configuration information
Delta Lake is enabled in EMR through Spark session extensions. The required configuration differs by Spark version.
Spark 2.X
spark.sql.extensions io.delta.sql.DeltaSparkSessionExtension
Spark 3.X
spark.sql.extensions io.delta.sql.DeltaSparkSessionExtension
spark.sql.catalog.spark_catalog org.apache.spark.sql.delta.catalog.DeltaCatalog
These settings are applied by default when you add the Delta Lake service to your EMR cluster. To apply them manually, pass them as --conf flags when starting streaming-sql.
Common operations
All examples in this section use a Delta table named delta_table.
Create a table
CREATE TABLE delta_table (id INT) USING delta;
Insert data
INSERT INTO delta_table VALUES 0,1,2,3,4;
Verify the insert:
SELECT * FROM delta_table;
Overwrite data
INSERT OVERWRITE TABLE delta_table VALUES 5,6,7,8,9;
Verify the result:
SELECT * FROM delta_table;
Update data
Add 100 to all even IDs:
UPDATE delta_table SET id = id + 100 WHERE mod(id, 2) = 0;
Verify the update:
SELECT * FROM delta_table;
Delete data
Delete all records with even IDs:
DELETE FROM delta_table WHERE mod(id, 2) = 0;
Verify the deletion:
SELECT * FROM delta_table;
Merge data
The MERGE INTO statement upserts data from a source table into a Delta table. Use it to apply conditional updates and inserts in a single atomic operation.
-
Create a source table for the merge operation.
CREATE TABLE newData(id INT) USING delta; -
Insert data into the source table.
INSERT INTO newData VALUES 0,1,2,3,4,5,6,7,8,9; -
Merge data from
newDataintodelta_table. If the ID of a record innewDatamatches an ID indelta_table, add 100 to the matched ID. Otherwise, insert the record directly.MERGE INTO delta_table AS target USING newData AS source ON target.id = source.id WHEN MATCHED THEN UPDATE SET target.id = source.id + 100 WHEN NOT MATCHED THEN INSERT *; -
Verify the merge result.
SELECT * FROM delta_table;
Read a Delta table in streaming mode
These steps use the EMR streaming-sql CLI to set up a streaming pipeline from an existing Delta table.
Step 6 requires a second streaming-sql client session running at the same time. Open a second terminal before starting step 6.
Prerequisites
Before you begin, ensure that you have:
-
An EMR cluster with SSH access
-
An existing Delta table named
delta_table
Set up the streaming pipeline
-
Start the
streaming-sqlclient.streaming-sqlIf Delta Lake is not added as a service to your cluster, start
streaming-sqlwith the Delta Lake JAR and configuration instead:streaming-sql --jars /path/to/delta-core_2.11-0.6.1.jar --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension -
Create the destination table.
CREATE TABLE stream_debug_table (id INT) USING DELTA; -
Create a stream scan on the source Delta table.
CREATE SCAN stream_delta_table on delta_table USING STREAM; -
Start the streaming job to write data from
delta_tabletostream_debug_table.CREATE STREAM job options ( triggerType='ProcessingTime', checkpointLocation = '/tmp/streaming_read_cp' ) INSERT INTO stream_debug_table SELECT * FROM stream_delta_table;
Verify streaming behavior
-
In a second
streaming-sqlclient session, query the current state of the destination table.SELECT * FROM stream_debug_table; -
In the second session, insert data into the source table.
INSERT INTO delta_table VALUES 801, 802; -
Query the destination table again to confirm the new data was streamed through.
SELECT * FROM stream_debug_table; -
Insert another batch of data into the source table.
INSERT INTO delta_table VALUES 901, 902; -
Query the destination table to confirm the second batch was streamed through.
SELECT * FROM stream_debug_table;