AnalyticDB for MySQL は、Object Storage Service (OSS) をバックエンドストレージとするデータレイク向けテーブルフォーマットである Delta と統合されています。Delta は INSERT、UPDATE、DELETE 操作をサポートします。この統合により、OSS に格納された Delta 外部テーブルに対して Spark SQL を実行してデータを読み取ったり書き込んだりできます。
AnalyticDB for MySQL には、組み込みの Delta パッケージ(バージョン 2.0.2)が含まれています。組み込みバージョンが要件を満たさない場合は、カスタム Delta パッケージを代わりにロードしてください。
前提条件
開始する前に、以下の条件を満たしていることを確認してください。
AnalyticDB for MySQL Enterprise Edition、Basic Edition、または Data Lakehouse Edition のクラスター
ジョブ リソースグループ クラスター向けの
クラスター用のデータベースアカウント
制限事項
XIHE エンジンでは、Delta テーブルの読み取りおよび書き込みはできません。
AnalyticDB for MySQL Spark は対応する Delta バージョンを追跡しますが、Delta 固有の問題や Delta バージョン間の互換性に関するトラブルシューティングは行いません。
Delta 外部テーブルへのデータの読み取りおよび書き込み
環境のセットアップ、Delta 外部テーブルの作成、データの書き込み、および結果のクエリ実行を行う手順に従ってください。
ステップ 1:SQL エディターを開く
AnalyticDB for MySQL コンソールにログインします。左上隅でリージョンを選択し、左側のナビゲーションウィンドウから [クラスター] をクリックします。
管理対象のクラスターを見つけ、クラスター ID をクリックします。
左側のナビゲーションウィンドウから、[ジョブ開発] > [SQL 開発] を選択します。
[SQLConsole 1] タブで、Spark エンジンおよびジョブリソースグループまたは Spark インタラクティブリソースグループを選択します。
ステップ 2:Delta 環境の設定(カスタムパッケージのみ)
組み込み Delta パッケージを使用する場合は、このステップはスキップしてください。
バージョンの非互換性を避けるため、AnalyticDB for MySQL Spark 3.2.0 と互換性のある Delta パッケージのバージョンをご使用ください。
次のステートメントを実行して、カスタム Delta JAR ファイルをロードし、Spark を設定します。
-- カスタム Delta パッケージのロード(JAR を事前に OSS にアップロードしてください)
ADD JAR oss://<bucket_name>/path/to/delta-core_xx.jar;
ADD JAR oss://<bucket_name>/path/to/delta-storage-xx.jar;
-- AnalyticDB for MySQL Spark が OSS コネクタを使用するよう設定
SET spark.adb.connectors=oss;
-- Spark との Delta Lake 統合を有効化
SET spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension;
SET spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog;<bucket_name> をご利用の OSS バケット名に置き換え、JAR ファイルのパスも適宜更新してください。
ステップ 3:外部データベースおよび Delta 外部テーブルの作成
データベースを作成します。既にデータベースが存在する場合は、このステップをスキップしてください。
CREATE DATABASE IF NOT EXISTS external_delta_db LOCATION "oss://<bucket_name>/test/";"oss://<bucket_name>/test/"をご利用の OSS パスに置き換えてください。Delta 外部テーブルを作成します。
CREATE TABLE IF NOT EXISTS external_delta_db.delta_test_tbl ( id INT, name STRING, age INT ) USING delta PARTITIONED BY (age) LOCATION "oss://<bucket_name>/test/delta_test_tbl";"oss://<bucket_name>/test/delta_test_tbl"をテーブルデータを格納する OSS パスに置き換えてください。
ステップ 4:データの書き込み
INSERT
Delta では、4 種類の INSERT パターンがサポートされています。ご使用のユースケースに応じて選択してください。
| ステートメント | 動作 |
|---|---|
INSERT INTO | 重複排除を行わず、行を追加 |
INSERT OVERWRITE(パーティション指定なし) | テーブル内の既存データをすべて置き換え |
INSERT OVERWRITE ... PARTITION(col=val) | 特定の静的パーティション内のデータを置き換え |
INSERT OVERWRITE ... PARTITION(col) | 一致する動的パーティション内のデータを置き換え |
行の追加:
INSERT INTO external_delta_db.delta_test_tbl VALUES (1, 'lisa', 10), (2, 'jams', 10);テーブル内の全データを置き換え:
INSERT OVERWRITE external_delta_db.delta_test_tbl VALUES (2, 'zhangsan', 10), (4, 'lisi', 30);静的パーティション内のデータを置き換え:
INSERT OVERWRITE external_delta_db.delta_test_tbl PARTITION (age=17) VALUES (3, 'anna');動的パーティション内のデータを置き換え:
INSERT OVERWRITE external_delta_db.delta_test_tbl PARTITION (age) VALUES (1, 'bom', 10);UPDATE
UPDATE external_delta_db.delta_test_tbl SET name = 'box' WHERE id = 1;name 列を、id = 1 の行で box に更新します。
DELETE
DELETE FROM external_delta_db.delta_test_tbl WHERE id = 1;このステートメントは、id = 1 の行を削除します。
ステップ 5:データのクエリ実行
SELECT * FROM external_delta_db.delta_test_tbl;