AnalyticDB for MySQL は Apache Hudi と統合されており、Spark SQL を使用して Object Storage Service (OSS) に保存されている Hudi 外部テーブルの読み取りと書き込みができます。Hudi テーブルは INSERT、UPDATE、DELETE 操作をサポートしています。
前提条件
開始する前に、以下をご確認ください:
予約済みストレージリソースが 0 AnalyticDB 計算ユニット (ACU) を超える AnalyticDB for MySQL Data Lakehouse Edition クラスター
Data Lakehouse Edition クラスターの予約済みストレージリソースは 0 ACU を超える必要があります。
クラスター用に作成されたジョブ リソースグループです。詳細については、Data Lakehouse Edition「リソースグループの作成」をご参照ください。Data Lakehouse Edition
クラスターのデータベースアカウント
ステップ 1:SQL 開発を開く
AnalyticDB for MySQL コンソールにログインします。左上の隅で、リージョンを選択します。左側のナビゲーションウィンドウで、[クラスター] をクリックします。[Data Lakehouse Edition] タブで、対象のクラスターを見つけてクラスター ID をクリックします。
左側のナビゲーションウィンドウで、[ジョブ開発] > [SQL 開発] を選択します。
[SQLConsole] タブで、Spark エンジンとジョブリソースグループを選択します。
ステップ 2:データベースと Hudi 外部テーブルの作成
以下の SQL ステートメントをバッチモードまたはインタラクティブモードで実行します。詳細については、「Spark SQL 実行モード」をご参照ください。
データベースの作成
データベースが既に存在する場合は、このステップをスキップしてください。
CREATE DATABASE adb_external_db_hudi
LOCATION 'oss://<bucket_name>/test/'; -- ご利用の OSS パスに置き換えてくださいHudi 外部テーブルの作成
CREATE TABLE adb_external_db_hudi.test_hudi_tbl (
`id` int,
`name` string,
`age` int
) USING hudi
TBLPROPERTIES (
primaryKey = 'id',
preCombineField = 'age'
)
PARTITIONED BY (age)
LOCATION 'oss://<bucket_name>/test/table/'; -- ご利用の OSS パスに置き換えてください以下のパスの制約に従ってください:
データベースと外部テーブルは、同じ OSS バケットを使用する必要があります。
外部テーブルの OSS パスは、データベースパスより少なくとも 1 つ深いディレクトリレベルにあり、かつデータベースパス内にネストされている必要があります。
primaryKeyは必須です。preCombineFieldはオプションですが、省略して UPDATE を実行するとエラーが返されます。
ステップ 3:データの書き込み
以下の SQL ステートメントをバッチモードまたはインタラクティブモードで実行します。詳細については、「Spark SQL 実行モード」をご参照ください。
INSERT
以下のいずれかのメソッドを使用してデータを挿入します:
INSERT INTO — テーブルに行を追加します:
INSERT INTO adb_external_db_hudi.test_hudi_tbl VALUES (1, 'lisa', 10), (2, 'jams', 10);INSERT OVERWRITE — 既存のすべてのデータを置き換えます:
INSERT OVERWRITE adb_external_db_hudi.test_hudi_tbl VALUES (1, 'lisa', 10), (2, 'jams', 20);静的パーティションを指定した INSERT OVERWRITE — 特定のパーティションのデータを置き換えます:
INSERT OVERWRITE adb_external_db_hudi.test_hudi_tbl PARTITION (age=10) VALUES (1, 'anna');動的パーティションを指定した INSERT OVERWRITE — データを置き換え、データからパーティション値を導出します:
INSERT OVERWRITE adb_external_db_hudi.test_hudi_tbl PARTITION (age) VALUES (1, 'bom', 10);UPDATE
特定の行を更新します — この例では、id = 2 の行の name を 'box' に設定します:
UPDATE adb_external_db_hudi.test_hudi_tbl SET name = 'box' WHERE id = 2;DELETE
特定の行を削除します — この例では、id = 1 の行を削除します:
DELETE FROM adb_external_db_hudi.test_hudi_tbl WHERE id = 1;同時実行制御
Hudi 外部テーブルは、ロックプロバイダーベースの同時実行制御メカニズムを使用します。異なるデータ範囲への複数の同時書き込みは許可されますが、書き込み競合を防ぐためにデータ範囲が重複してはなりません。
オープンソースの Hudi JAR パッケージを使用する場合、MdsBasedLockProviderは同時実行制御に利用できません。Hudi 構成のパラメーター名は、hoodie.*プレフィックスを使用します。これは Hudi プロジェクトの元の内部命名規則であり、タイプミスではありません。
同時 DML 操作を実行する前に、次のパラメーターを設定します。Hudi の同時実行制御メカニズムの詳細については、「同時実行制御」をご参照ください。
SET hoodie.cleaner.policy.failed.writes = LAZY;
SET hoodie.write.concurrency.mode = OPTIMISTIC_CONCURRENCY_CONTROL;
SET hoodie.write.lock.provider = org.apache.hudi.sync.adb.MdsBasedLockProvider;| パラメーター | 値 | 必須 | 説明 |
|---|---|---|---|
hoodie.cleaner.policy.failed.writes | LAZY | はい | ダーティデータのクリーンアップポリシー。LAZY は、ハートビートの有効期限が切れるまで不完全な書き込みのクリーンアップを遅延させます。これは同時書き込みシナリオで必須です。 |
hoodie.write.concurrency.mode | OPTIMISTIC_CONCURRENCY_CONTROL | はい | 同時実行モード。OPTIMISTIC_CONCURRENCY_CONTROL は複数のライターを許可し、各書き込みが完了する前に競合をチェックします。競合が検出された場合、書き込みは失敗します。 |
hoodie.write.lock.provider | org.apache.hudi.sync.adb.MdsBasedLockProvider | はい | ロックプロバイダークラス。org.apache.hudi.common.lock.LockProvider のサブクラスである必要があります。 |
ステップ 4:データのクエリ
以下の SQL ステートメントをバッチモードまたはインタラクティブモードで実行します。詳細については、「Spark SQL 実行モード」をご参照ください。
Spark SQL はクエリ結果ではなく、成功または失敗のメッセージを返します。データを表示するには、[Spark JAR 開発] ページに移動し、[アプリケーション] タブをクリックし、対象のアプリケーションの [操作] 列にある [ログ] をクリックします。詳細については、Spark エディターのトピックの「Spark アプリケーションに関する情報の表示」セクションをご参照ください。
SELECT * FROM adb_external_db_hudi.test_hudi_tbl;