このトピックでは、AnalyticDB for MySQL で Spark SQL と Delta Lake テーブルフォーマットを使用して、エンタープライズデータレイクを構築し、データライフサイクルを管理する方法について説明します。
ソースレイヤーの取り込み (ブロンズレイヤー)
CREATE TABLE AS SELECT (CTAS) 文を使用すると、外部データを Delta Lake に迅速にインポートできます。この段階では、スキーマやパーティションを定義する必要はありません。
CREATE DATABASE IF NOT EXISTS deltalake_db LOCATION 'oss://<YOUR_OSS_BUCKET>/PATH/';-- 1. メタデータをクリーンアップします。
DROP TABLE IF EXISTS deltalake_db.orders_staging;
-- 2. CTAS を使用してデータを迅速に取り込みます (パーティションなし、完全レプリケーション)。
CREATE OR REPLACE TABLE deltalake_db.orders_staging
USING DELTA
COMMENT 'ブロンズレイヤー: Raw ステージングテーブル'
AS
SELECT * FROM ADB_External_TPCH_10GB.external_orders;
-- 検証: データはロードされますが、パーティションが使用されていないため、全表スキャンのパフォーマンスは低下します。
SELECT count(*) FROM deltalake_db.orders_staging;パーティションテーブルの構築
年ごとに物理的にパーティション化された本番テーブルを作成します。次に、orders_staging テーブルのデータをクリーンアップし、本番テーブルにロードします。このプロセスにより、スモールファイル問題が解決され、クエリ効率が向上します。
-- 1. メタデータをクリーンアップします。
DROP TABLE IF EXISTS deltalake_db.orders_prod;
-- 2. 本番テーブルのスキーマを定義します (明示的な o_order_year 列を含む)。
CREATE TABLE IF NOT EXISTS deltalake_db.orders_prod (
o_orderkey LONG,
o_custkey LONG,
o_orderstatus STRING,
o_totalprice DECIMAL(18,2),
o_orderdate DATE,
o_orderpriority STRING,
o_clerk STRING,
o_shippriority INT,
o_comment STRING,
o_order_year INT -- [明示的なパーティションキー列]
)
USING DELTA
PARTITIONED BY (o_order_year) -- 年ごとに物理的に分離します。
COMMENT 'シルバーレイヤー: パーティション化された本番テーブル';
-- 3. データを集計、クリーンアップ、書き込みます。
-- ステージングテーブルから読み取り、年を計算し、本番テーブルに書き込みます。
INSERT INTO deltalake_db.orders_prod
SELECT
-- ソーステーブルの元のフィールドを明示的にリストします (* は使用しません)。
o_orderkey,
o_custkey,
o_orderstatus,
o_totalprice,
o_orderdate,
o_orderpriority,
o_clerk,
o_shippriority,
o_comment,
-- パーティションフィールドを計算します。
YEAR(o_orderdate) as o_order_year
FROM deltalake_db.orders_staging;
-- 検証: パーティション情報を表示します。
DESCRIBE DETAIL deltalake_db.orders_prod;ビジネス上の変更とトランザクション (変更)
orders_prod 本番テーブルに対して、定期的なデータ修正操作を実行できます。
-- 1. 更新操作 (ACID)。
-- シナリオ: 1993 年の特定の店員のすべての注文ステータスを修正します。
UPDATE deltalake_db.orders_prod
SET o_orderstatus = 'P'
WHERE o_order_year = 1993 -- パーティションプルーニングを使用してファイルを迅速に検索します。
AND o_clerk = 'Clerk#000000001';
-- 2. 削除操作。
DELETE FROM deltalake_db.orders_prod
WHERE o_orderpriority = '5-LOW' AND o_order_year = 1992;増分マージ (アップサート)
T+1 の増分データを処理するには、新しいデータを含む一時ビューを作成します。次に、MERGE 文を使用して、増分データを本番テーブルにマージします。
-- 増分データストリームをシミュレートします。
MERGE INTO deltalake_db.orders_prod AS target
USING (
SELECT
999999999 AS o_orderkey,
12345 AS o_custkey,
'O' AS o_orderstatus,
100.00 AS o_totalprice,
cast('2025-05-20' as date) AS o_orderdate,
'1-URGENT' AS o_orderpriority,
'Clerk#Auto' AS o_clerk,
0 AS o_shippriority,
'New Order' AS o_comment,
2025 AS o_order_year -- パーティションキー列を含める必要があります。
) AS source
ON target.o_orderkey = source.o_orderkey
WHEN MATCHED THEN UPDATE SET target.o_totalprice = source.o_totalprice
WHEN NOT MATCHED THEN INSERT *;履歴の参照
データバージョン管理機能を使用して、データの以前のバージョンをクエリできます。
-- 1. 操作履歴を表示します。
DESCRIBE HISTORY deltalake_db.orders_prod;
-- 2. テーブル作成直後の状態にタイムトラベルします (バージョン 0 は CREATE TABLE、バージョン 1 は INSERT)。
-- バージョン 1 は、ステージングテーブルからデータがインポートされた直後の状態であると仮定します。
SELECT COUNT(*) FROM deltalake_db.orders_prod VERSION AS OF 1;パフォーマンスの最適化 (メンテナンス)
本番テーブルを物理的に最適化できます。この機能は、Parquet テーブルに対する Delta Lake の主要な利点です。
-- 1. スモールファイルの圧縮 (コンパクション)。
-- これにより、同じパーティション内の散在するスモールファイルが、より大きなファイルにマージされます。
OPTIMIZE deltalake_db.orders_prod;
-- 2. Z オーダー多次元クラスタリング (データスキッピング)。
-- これにより、同じ o_custkey を持つデータが物理的にまとめられ、「WHERE o_custkey = ...」を含むクエリが大幅に高速化されます。
OPTIMIZE deltalake_db.orders_prod ZORDER BY (o_custkey);
-- 3. ファイルの物理的なクリーンアップ (バキューム)。
-- 不要になった孤立ファイルを削除して、OSS スペースを解放します。
SET spark.databricks.delta.retentionDurationCheck.enabled = false;
VACUUM deltalake_db.orders_prod RETAIN 0 HOURS; -- テスト専用です。本番環境では 168 HOURS を使用してください。テーブル定義とデータの削除
DROP TABLE PURGE 文でファイルを物理的に削除するには、Spark を起動する前に次のパラメーターを設定する必要があります。設定を有効にするには、設定後に Spark を再起動する必要があります。
spark.hive.purgeExternalTableData.enabled=true.spark.hive.purgeManagedTableData.enabled=true
DROP TABLE IF EXISTS deltalake_db.orders_prod purge;