This topic describes the configuration information of Delta Lake in E-MapReduce (EMR) and provides some examples of common commands.
Configuration information
Default configuration of Delta Lake in EMR:
Spark 2.X
spark.sql.extensions io.delta.sql.DeltaSparkSessionExtensionSpark 3.X
spark.sql.extensions io.delta.sql.DeltaSparkSessionExtension spark.sql.catalog.spark_catalog org.apache.spark.sql.delta.catalog.DeltaCatalog
Common sample commands
Create a table
CREATE TABLE delta_table (id INT) USING delta;Insert data into a table
INSERT INTO delta_table VALUES 0,1,2,3,4;Overwrite the existing data in a table
INSERT OVERWRITE TABLE delta_table VALUES 5,6,7,8,9;Query data
SELECT * FROM delta_table;Update data
Add 100 to all
IDsthat are even numbers.UPDATE delta_table SET id = id + 100 WHERE mod(id, 2) = 0;Delete data
Delete the records whose
IDsare even numbers.DELETE FROM delta_table WHERE mod(id, 2) = 0;
Merge data
Create a source table for the merge operation.
CREATE TABLE newData(id INT) USING delta;Insert data into the table.
INSERT INTO newData VALUES 0,1,2,3,4,5,6,7,8,9;Merge data in the newData table into the delta_table table. If the
IDof a record in the newData table is the same as the ID of a record in the delta_table table, 100 is added to theIDof the record in the source table. Otherwise, the record is directly inserted.MERGE INTO delta_table AS target USING newData AS source ON target.id = source.id WHEN MATCHED THEN UPDATE SET target.id = source.id + 100 WHEN NOT MATCHED THEN INSERT *;Read data from a Delta table in streaming mode.
Create a destination table into which you want to write data in streaming mode.
CREATE TABLE stream_debug_table (id INT) USING delta;Create a stream.
CREATE SCAN stream_delta_table on delta_table USING STREAM;NoteIn this example, delta_table is an existing Delta table.
Write data to the destination table in streaming mode.
CREATE STREAM job options ( triggerType='ProcessingTime', checkpointLocation = '/tmp/streaming_read_cp' ) INSERT INTO stream_debug_table SELECT * FROM stream_delta_table;
Example
Log on to your cluster in SSH mode. For more information, see Log on to a cluster.
Run the following command to start streaming-sql:
streaming-sqlNoteIf the Delta Lake service is added to your cluster, you can directly run the
streaming-sqlcommand. If the service is not added to your cluster, you can run the following command to use Delta Lake:streaming-sql --jars /path/to/delta-core_2.11-0.6.1.jar --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtensionRun the following command to create a destination table:
CREATE TABLE stream_debug_table (id INT) USING DELTA;Run the following command to create a stream:
CREATE SCAN stream_delta_table on delta_table USING STREAM;Run the following command to write data in the source table named delta_table to the destination table in streaming mode:
CREATE STREAM job options ( triggerType='ProcessingTime', checkpointLocation = '/tmp/streaming_read_cp' ) INSERT INTO stream_debug_table SELECT * FROM stream_delta_table;Start another streaming-sql client and perform the following operations to insert data into the source table and query data in the destination table:
Run the following command to query existing data in the destination table:
SELECT * FROM stream_debug_table;Run the following command to insert data into the source table:
INSERT INTO delta_table VALUES 801, 802;Run the following command to check whether the data that is inserted into the source table is written to the destination table in streaming mode:
SELECT * FROM stream_debug_table;Run the following command to insert data into the source table:
INSERT INTO delta_table VALUES 901, 902;Run the following command to check whether the data that is inserted into the source table is written to the destination table in streaming mode:
SELECT * FROM stream_debug_table;