このトピックでは、E-MapReduce(EMR)における Delta Lake の構成情報について説明し、一般的なコマンドの例をいくつか示します。
構成情報
EMR における Delta Lake のデフォルト構成:
Spark 2.X
spark.sql.extensions io.delta.sql.DeltaSparkSessionExtensionSpark 3.X
spark.sql.extensions io.delta.sql.DeltaSparkSessionExtension spark.sql.catalog.spark_catalog org.apache.spark.sql.delta.catalog.DeltaCatalog
一般的なサンプルコマンド
テーブルを作成する
CREATE TABLE delta_table (id INT) USING delta;テーブルにデータを挿入する
INSERT INTO delta_table VALUES 0,1,2,3,4;テーブルの既存のデータを上書きする
INSERT OVERWRITE TABLE delta_table VALUES 5,6,7,8,9;データをクエリする
SELECT * FROM delta_table;データを更新する
偶数であるすべての
IDに 100 を加算します。UPDATE delta_table SET id = id + 100 WHERE mod(id, 2) = 0;データを削除する
IDが偶数であるレコードを削除します。DELETE FROM delta_table WHERE mod(id, 2) = 0;
データのマージ
マージ操作のソーステーブルを作成します。
CREATE TABLE newData(id INT) USING delta;テーブルにデータを挿入します。
INSERT INTO newData VALUES 0,1,2,3,4,5,6,7,8,9;newData テーブルのデータを delta_table テーブルにマージします。 newData テーブルのレコードの
IDが delta_table テーブルのレコードの ID と同じである場合、ソーステーブルのレコードのIDに 100 が加算されます。そうでない場合は、レコードが直接挿入されます。MERGE INTO delta_table AS target USING newData AS source ON target.id = source.id WHEN MATCHED THEN UPDATE SET target.id = source.id + 100 WHEN NOT MATCHED THEN INSERT *;ストリーミングモードで Delta テーブルからデータを読み取ります。
ストリーミングモードでデータを書き込む先のテーブルを作成します。
CREATE TABLE stream_debug_table (id INT) USING delta;ストリームを作成します。
CREATE SCAN stream_delta_table on delta_table USING STREAM;説明この例では、delta_table は既存の Delta テーブルです。
ストリーミングモードでデータをデスティネーションテーブルに書き込みます。
CREATE STREAM job options ( triggerType='ProcessingTime', checkpointLocation = '/tmp/streaming_read_cp' ) INSERT INTO stream_debug_table SELECT * FROM stream_delta_table;
例
SSH モードでクラスターにログオンします。詳細については、「クラスターへのログオン」をご参照ください。
次のコマンドを実行して、streaming-sql を起動します。
streaming-sql説明Delta Lake サービスがクラスターに追加されている場合は、
streaming-sqlコマンドを直接実行できます。サービスがクラスターに追加されていない場合は、次のコマンドを実行して Delta Lake を使用できます。streaming-sql --jars /path/to/delta-core_2.11-0.6.1.jar --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension次のコマンドを実行して、デスティネーションテーブルを作成します。
CREATE TABLE stream_debug_table (id INT) USING DELTA;次のコマンドを実行して、ストリームを作成します。
CREATE SCAN stream_delta_table on delta_table USING STREAM;次のコマンドを実行して、delta_table という名前のソーステーブルのデータをストリーミングモードでデスティネーションテーブルに書き込みます。
CREATE STREAM job options ( triggerType='ProcessingTime', checkpointLocation = '/tmp/streaming_read_cp' ) INSERT INTO stream_debug_table SELECT * FROM stream_delta_table;別の streaming-sql クライアントを起動し、次の操作を実行して、ソーステーブルにデータを挿入し、デスティネーションテーブルのデータをクエリします。
次のコマンドを実行して、デスティネーションテーブルの既存のデータをクエリします。
SELECT * FROM stream_debug_table;次のコマンドを実行して、ソーステーブルにデータを挿入します。
INSERT INTO delta_table VALUES 801, 802;次のコマンドを実行して、ソーステーブルに挿入されたデータがストリーミングモードでデスティネーションテーブルに書き込まれたかどうかを確認します。
SELECT * FROM stream_debug_table;次のコマンドを実行して、ソーステーブルにデータを挿入します。
INSERT INTO delta_table VALUES 901, 902;次のコマンドを実行して、ソーステーブルに挿入されたデータがストリーミングモードでデスティネーションテーブルに書き込まれたかどうかを確認します。
SELECT * FROM stream_debug_table;