このトピックでは、AnalyticDB for MySQL の Spark SQL を使用して、エンタープライズレベルのデータレイクを構築し、Apache Iceberg テーブルのデータライフサイクルを管理する方法について説明します。
AnalyticDB for MySQL の Spark エンジンには、iceberg という名前の Iceberg カタログが組み込まれています。このカタログは、AnalyticDB for MySQL のグローバルメタサービスを使用します。
データレイクへのデータのバッチ取り込み
Iceberg の CREATE TABLE AS SELECT (CTAS) 文を使用すると、生データを迅速に取り込むことができます。この文は、単一の操作でテーブルを作成し、データを書き込みます。
CREATE DATABASE IF NOT EXISTS iceberg_lakehouse
LOCATION 'oss://<YOUR_OSS_BUCKET>/PATH/'
WITH DBPROPERTIES (catalog = 'mix');
-- 1. クリーンアップ
DROP TABLE IF EXISTS iceberg_lakehouse.iceberg_orders_staging;
-- 2. CTAS を使用してデータを迅速に取り込む
CREATE TABLE iceberg_lakehouse.iceberg_orders_staging
USING iceberg
COMMENT 'ブロンズレイヤー: Iceberg ステージング'
AS
SELECT * FROM ADB_External_TPCH_10GB.external_orders;隠れパーティションテーブルの作成
隠れパーティショニング機能を使用すると、次のデータ定義言語 (DDL) 文の PARTITIONED BY (years(o_orderdate)) のように、元の列の変換に基づいてパーティションを定義できます。追加のパーティションキー列を作成する必要はありません。
-- 1. クリーンアップ
DROP TABLE IF EXISTS iceberg_lakehouse.iceberg_orders_prod;
-- 2. 本番テーブルの定義 (Iceberg のパーティション変換を使用)
CREATE TABLE iceberg_lakehouse.iceberg_orders_prod (
o_orderkey LONG,
o_custkey LONG,
o_orderstatus STRING,
o_totalprice DECIMAL(18,2),
o_orderdate DATE,
o_orderpriority STRING,
o_clerk STRING,
o_shippriority INT,
o_comment STRING
)
USING iceberg
-- [コア] Iceberg 固有のパーティション変換構文
-- 追加の o_order_year 列は不要です。years 変換は日付列に直接適用されます。
PARTITIONED BY (years(o_orderdate))
COMMENT 'シルバーレイヤー: Iceberg 隠れパーティショニング';
-- 3. データの書き込み
-- 元の列のみを選択した場合でも、Iceberg はバックグラウンドで自動的にパーティションを計算し、データをアーカイブします。
INSERT INTO
iceberg_lakehouse.iceberg_orders_prod
SELECT
o_orderkey, o_custkey, o_orderstatus,
o_totalprice, o_orderdate, o_orderpriority,
o_clerk, o_shippriority, o_comment
FROM
iceberg_lakehouse.iceberg_orders_staging;
-- 4. パーティションの検証
-- Iceberg のメタデータテーブルを使用すると、パーティションだけでなく、物理ファイルの分布も表示できます。
SELECT partition, file_path, record_count
FROM iceberg_lakehouse.iceberg_orders_prod.files;行レベルの変更
Iceberg は Copy-on-Write (CoW) と Merge-on-Read (MoR) をサポートしています。デフォルトは CoW です。
-- 1. 更新操作
-- シナリオ: ビジネス上の修正
UPDATE iceberg_lakehouse.iceberg_orders_prod
SET o_orderstatus = 'X'
WHERE o_orderdate = '1993-01-01' AND o_custkey = 12345;
-- ^ 注: o_orderdate で直接フィルターします。Iceberg は手動の介入なしで 1993 パーティションを自動的に特定します。
-- 2. 削除操作
DELETE FROM iceberg_lakehouse.iceberg_orders_prod
WHERE o_clerk = 'Clerk#000000001';スキーマ進化
Iceberg は、スキーマ変更において Delta Lake よりも柔軟です。列名の変更や型のアップグレードなど、完全なスキーマ進化をサポートしています。これにはデータファイルのリライトは不要で、メタデータマッピングの変更のみが必要です。
-- 1. 列名の変更 - これは Parquet や Hive では困難です。
ALTER TABLE iceberg_lakehouse.iceberg_orders_prod
RENAME COLUMN o_totalprice TO o_final_price;
-- 2. 列の追加
ALTER TABLE iceberg_lakehouse.iceberg_orders_prod
ADD COLUMN etl_ts timestamp;
-- 3. 列の順序変更 - 新しい列を中間に移動します。
ALTER TABLE iceberg_lakehouse.iceberg_orders_prod
ALTER COLUMN etl_ts AFTER o_orderdate;タイムトラベル
Iceberg テーブルの履歴スナップショットをクエリして、スナップショット ID またはタイムスタンプに基づいて以前のバージョンのデータにアクセスできます。
-- 1. スナップショット履歴を表示して snapshot_id を取得
-- Iceberg は .history または .snapshots メタデータテーブルを使用します。
SELECT * FROM iceberg_lakehouse.iceberg_orders_prod.history;
-- 2. タイムトラベルクエリ (スナップショット ID による)
-- 注: Iceberg は SYSTEM_VERSION の使用を推奨しています。
SELECT count(*)
FROM iceberg_lakehouse.iceberg_orders_prod
FOR SYSTEM_VERSION AS OF 123456789; -- これを実際のスナップショット ID に置き換えてください。
-- 3. タイムトラベルクエリ (タイムスタンプによる)
SELECT count(*)
FROM iceberg_lakehouse.iceberg_orders_prod
TIMESTAMP AS OF (current_timestamp() - INTERVAL 10 MINUTES); -- 必要に応じて時間間隔を変更してください。テーブルファイルの管理
スナップショットの有効期限切れ処理
Iceberg は Multi-Version Concurrency Control (MVCC) を使用します。データが削除されると、削除済みとしてマークされるだけです。expire_snapshots プロシージャを使用すると、期限切れのスナップショットをクリーンアップし、関連する物理ファイルを削除できます。以下の例は、一般的なクリーンアップ方法を示しています。
特定のタイムスタンプ T より前に作成されたスナップショットをクリーンアップします。
CALL system.expire_snapshots( table => 'iceberg_lakehouse.iceberg_orders_prod', older_than => TIMESTAMP '2023-10-01 00:00:00.000' -- これを実際の時刻に置き換えてください。通常、ミリ秒単位のタイムスタンプが渡されます。 )最新の N 個のスナップショットを保持します。
保持ポリシー:
older_than = T(タイムスタンプ、デフォルトは 5 日前) を使用して、有効期限のカットオフポイントを設定します。T より古いスナップショットのみがクリーンアップされます。retain_last = N(デフォルトは 1) を使用して、保持するスナップショットの最小数を指定します。一部のスナップショットが T より古い場合でも、最新の N 個のスナップショットは保持されます。特殊なユースケース: T を未来の時点に設定し、
retain_lastを指定して最新の N 個のスナップショットを保持します。以下の例でその方法を示します。-- セルで実行する場合は、最後のセミコロン (;) を削除してください。 CALL iceberg.system.expire_snapshots( table => 'iceberg_lakehouse.iceberg_orders_prod', older_than => TIMESTAMP '2027-06-30 00:00:00.000', retain_last => 1 )
孤立ファイルの削除
書き込みタスクが失敗した場合、または DROP TABLE の実行時に PURGE キーワードを追加しなかった場合、パス内に参照されていない孤立ファイルが多数生成される可能性があります。次のコマンドを使用して、これらのファイルをスキャンしてクリーンアップできます。
CALL system.remove_orphan_files(
table => 'iceberg_lakehouse.iceberg_orders_prod'
)Iceberg テーブルの削除
テーブル定義は削除しますが、物理データファイルは削除しません。
DROP TABLE iceberg_lakehouse.iceberg_orders_prod;テーブルを削除する際にデータファイルも削除するには、Spark セッションを開始するときに次のパラメーターを追加します:
spark.hive.purgeExternalTableData.enabled=true
spark.hive.purgeManagedTableData.enabled=true
テーブルとそのデータファイルを同時に削除します。
DROP TABLE iceberg_lakehouse.iceberg_orders_prod PURGE;