すべてのプロダクト
Search
ドキュメントセンター

AnalyticDB:Spark SQL を使用した Delta Lake テーブルのライフサイクル管理

最終更新日:Jan 15, 2026

このトピックでは、AnalyticDB for MySQL で Spark SQL と Delta Lake テーブルフォーマットを使用して、エンタープライズデータレイクを構築し、データライフサイクルを管理する方法について説明します。

ソースレイヤーの取り込み (ブロンズレイヤー)

CREATE TABLE AS SELECT (CTAS) 文を使用すると、外部データを Delta Lake に迅速にインポートできます。この段階では、スキーマやパーティションを定義する必要はありません。

CREATE DATABASE IF NOT EXISTS deltalake_db LOCATION 'oss://<YOUR_OSS_BUCKET>/PATH/';
-- 1. メタデータをクリーンアップします。
DROP TABLE IF EXISTS deltalake_db.orders_staging;

-- 2. CTAS を使用してデータを迅速に取り込みます (パーティションなし、完全レプリケーション)。
CREATE OR REPLACE TABLE  deltalake_db.orders_staging
USING DELTA
COMMENT 'ブロンズレイヤー: Raw ステージングテーブル'
AS 
SELECT * FROM ADB_External_TPCH_10GB.external_orders;

-- 検証: データはロードされますが、パーティションが使用されていないため、全表スキャンのパフォーマンスは低下します。
SELECT count(*) FROM deltalake_db.orders_staging;

パーティションテーブルの構築

年ごとに物理的にパーティション化された本番テーブルを作成します。次に、orders_staging テーブルのデータをクリーンアップし、本番テーブルにロードします。このプロセスにより、スモールファイル問題が解決され、クエリ効率が向上します。

-- 1. メタデータをクリーンアップします。
DROP TABLE IF EXISTS deltalake_db.orders_prod;

-- 2. 本番テーブルのスキーマを定義します (明示的な o_order_year 列を含む)。
CREATE TABLE IF NOT EXISTS deltalake_db.orders_prod (
    o_orderkey LONG,
    o_custkey LONG,
    o_orderstatus STRING,
    o_totalprice DECIMAL(18,2),
    o_orderdate DATE,
    o_orderpriority STRING,
    o_clerk STRING,
    o_shippriority INT,
    o_comment STRING,
    o_order_year INT -- [明示的なパーティションキー列]
)
USING DELTA
PARTITIONED BY (o_order_year) -- 年ごとに物理的に分離します。
COMMENT 'シルバーレイヤー: パーティション化された本番テーブル';

-- 3. データを集計、クリーンアップ、書き込みます。
-- ステージングテーブルから読み取り、年を計算し、本番テーブルに書き込みます。
INSERT INTO deltalake_db.orders_prod
SELECT 
    -- ソーステーブルの元のフィールドを明示的にリストします (* は使用しません)。
    o_orderkey, 
    o_custkey, 
    o_orderstatus, 
    o_totalprice, 
    o_orderdate, 
    o_orderpriority, 
    o_clerk, 
    o_shippriority, 
    o_comment,
    -- パーティションフィールドを計算します。
    YEAR(o_orderdate) as o_order_year
FROM deltalake_db.orders_staging;

-- 検証: パーティション情報を表示します。
DESCRIBE DETAIL deltalake_db.orders_prod;

ビジネス上の変更とトランザクション (変更)

orders_prod 本番テーブルに対して、定期的なデータ修正操作を実行できます。

-- 1. 更新操作 (ACID)。
-- シナリオ: 1993 年の特定の店員のすべての注文ステータスを修正します。
UPDATE deltalake_db.orders_prod 
SET o_orderstatus = 'P' 
WHERE o_order_year = 1993 -- パーティションプルーニングを使用してファイルを迅速に検索します。
  AND o_clerk = 'Clerk#000000001';

-- 2. 削除操作。
DELETE FROM deltalake_db.orders_prod 
WHERE o_orderpriority = '5-LOW' AND o_order_year = 1992;

増分マージ (アップサート)

T+1 の増分データを処理するには、新しいデータを含む一時ビューを作成します。次に、MERGE 文を使用して、増分データを本番テーブルにマージします。

-- 増分データストリームをシミュレートします。
MERGE INTO deltalake_db.orders_prod AS target
USING (
    SELECT 
        999999999 AS o_orderkey, 
        12345 AS o_custkey, 
        'O' AS o_orderstatus, 
        100.00 AS o_totalprice, 
        cast('2025-05-20' as date) AS o_orderdate, 
        '1-URGENT' AS o_orderpriority, 
        'Clerk#Auto' AS o_clerk, 
        0 AS o_shippriority, 
        'New Order' AS o_comment,
        2025 AS o_order_year -- パーティションキー列を含める必要があります。
) AS source
ON target.o_orderkey = source.o_orderkey
WHEN MATCHED THEN UPDATE SET target.o_totalprice = source.o_totalprice
WHEN NOT MATCHED THEN INSERT *;

履歴の参照

データバージョン管理機能を使用して、データの以前のバージョンをクエリできます。

-- 1. 操作履歴を表示します。
DESCRIBE HISTORY deltalake_db.orders_prod;

-- 2. テーブル作成直後の状態にタイムトラベルします (バージョン 0 は CREATE TABLE、バージョン 1 は INSERT)。
-- バージョン 1 は、ステージングテーブルからデータがインポートされた直後の状態であると仮定します。
SELECT COUNT(*) FROM deltalake_db.orders_prod VERSION AS OF 1;

パフォーマンスの最適化 (メンテナンス)

本番テーブルを物理的に最適化できます。この機能は、Parquet テーブルに対する Delta Lake の主要な利点です。

-- 1. スモールファイルの圧縮 (コンパクション)。
-- これにより、同じパーティション内の散在するスモールファイルが、より大きなファイルにマージされます。
OPTIMIZE deltalake_db.orders_prod;

-- 2. Z オーダー多次元クラスタリング (データスキッピング)。
-- これにより、同じ o_custkey を持つデータが物理的にまとめられ、「WHERE o_custkey = ...」を含むクエリが大幅に高速化されます。
OPTIMIZE deltalake_db.orders_prod ZORDER BY (o_custkey);

-- 3. ファイルの物理的なクリーンアップ (バキューム)。
-- 不要になった孤立ファイルを削除して、OSS スペースを解放します。
SET spark.databricks.delta.retentionDurationCheck.enabled = false;
VACUUM deltalake_db.orders_prod RETAIN 0 HOURS; -- テスト専用です。本番環境では 168 HOURS を使用してください。

テーブル定義とデータの削除

説明

DROP TABLE PURGE 文でファイルを物理的に削除するには、Spark を起動する前に次のパラメーターを設定する必要があります。設定を有効にするには、設定後に Spark を再起動する必要があります。

  • spark.hive.purgeExternalTableData.enabled=true.

  • spark.hive.purgeManagedTableData.enabled=true

DROP TABLE IF EXISTS deltalake_db.orders_prod purge;