This topic describes the configuration information of Delta Lake in E-MapReduce (EMR) and provides some examples of common commands.
Configuration information
Default configuration of Delta Lake in EMR:
- Spark 2.X
spark.sql.extensions io.delta.sql.DeltaSparkSessionExtension
- Spark 3.X
spark.sql.extensions io.delta.sql.DeltaSparkSessionExtension spark.sql.catalog.spark_catalog org.apache.spark.sql.delta.catalog.DeltaCatalog
Common commands
- Create a table
CREATE TABLE delta_table (id INT) USING delta;
- Insert data into a table
INSERT INTO delta_table VALUES 0,1,2,3,4;
- Overwrites the existing data in a table
INSERT OVERWRITE TABLE delta_table VALUES 5,6,7,8,9;
- Query data
SELECT * FROM delta_table;
- Update data
UPDATE delta_table SET id = id + 100 WHERE mod(id, 2) = 0;--Add 100 to all IDs that are even numbers.
- Delete data
DELETE FROM delta_table WHERE mod(id, 2) = 0;--Delete the records whose IDs are even numbers.
- Merge data
- Create a source table named newData for the merge operation.
CREATE TABLE newData(id INT) USING delta;
- Insert data into the newData table.
INSERT INTO newData VALUES 0,1,2,3,4,5,6,7,8,9;
- Merge data in the newData table into the delta_table table. If the ID of a record
in the newData table is the same as the ID of a record in the delta_table table, 100
is added to the ID of the record in the source table. Otherwise, the record is directly
inserted.
MERGE INTO delta_table AS target USING newData AS source ON target.id = source.id WHEN MATCHED THEN UPDATE SET target.id = source.id + 100 WHEN NOT MATCHED THEN INSERT *;
- Create a source table named newData for the merge operation.
- Read data from a Delta table in streaming mode
- Create a destination table into which you want to write the data that is read from
the Delta table.
CREATE TABLE stream_debug_table (id INT);
- Create a stream.
CREATE SCAN stream_delta_table on delta_table USING STREAM;
Note In this example, delta_table is an existing Delta table. - Write data to the destination table.
CREATE STREAM job options ( triggerType='ProcessingTime', checkpointLocation = '/tmp/streaming_read_cp' ) INSERT INTO stream_debug_table SELECT * FROM stream_delta_table;
- Create a destination table into which you want to write the data that is read from
the Delta table.
- Write data to a Delta table in streaming mode
- Create a Kafka pipeline table.
CREATE TABLE IF NOT EXISTS kafka_topic USING kafka OPTIONS ( kafka.bootstrap.servers = "${BOOTSTRAP_SERVERS}", subscribe = "${TOPIC_NAME}" );
Note In the preceding code,kafka.bootstrap.servers
specifies the internal IP address and port number of a Kafka broker in the Kafka cluster.subscribe
specifies the name of a topic. - Create a stream.
CREATE SCAN stream_kafka_topic on kafka_topic USING STREAM;
- Write data to the Delta table in streaming mode.
CREATE STREAM job options ( triggerType='ProcessingTime', checkpointLocation = '/tmp/streaming_read_cp' ) INSERT INTO stream_debug_table SELECT * FROM stream_delta_table;
- Create a Kafka pipeline table.