すべてのプロダクト
Search
ドキュメントセンター

MaxCompute:Flink を使用した Delta テーブルへのニアリアルタイムの部分列更新

最終更新日:May 24, 2025

このトピックでは、Delta テーブルにおける部分列更新のシナリオとパラメーター構成、および Flink Connector 向けに設計された 2 つの部分列更新モードと関連構成について説明します。

背景情報

  • UPSERT 操作: INSERT 機能と UPDATE 機能を組み合わせたデータベース機能です。 UPSERT によって処理される各レコード (または行) にプライマリキー列を含める必要があることで、効率的なデータ操作を実現します。

  • UPSERT の動作: 指定されたプライマリキーを持つデータがテーブルに存在するかどうかによって異なります。

    • 挿入セマンティクス: 指定されたプライマリキーを持つデータがテーブルに存在しない場合、UPSERT は挿入操作を実行してテーブルに新しいレコードを追加します。

    • 更新セマンティクス: 指定されたプライマリキーを持つレコードがテーブルに既に存在する場合、UPSERT は更新操作を実行して既存のデータを指定された新しいデータで更新します。

  • UPSERT シナリオ: 複数のテーブル結合を使用したストリーム処理では、2 つの異なるデータストリームからの更新が同じテーブルの異なる列に影響を与えます。

    • データストリーム StreamA は、列 ColumnX の更新を担当します。

    • データストリーム StreamB は、列 ColumnY の更新を担当します。

  • UPSERT 形式の比較:

    • 従来の UPSERT: StreamB からの更新は、StreamA によって行われた変更を上書きする可能性があり、データの不整合につながります。

    • 部分列更新機能: 同時更新中にストリーム間で競合が発生しないようにします。 各ストリームは、担当する列のみを更新し、同じ行のすべてのストリームからの更新結果を保持します。

シナリオ

シナリオ 1: 同じ行の異なる列を干渉なしで更新する

ユーザーデータをリアルタイムで処理および更新する必要があるユーザー情報管理システムがあるとします。 データは、異なるデータソースから情報を受信する 2 つの独立したサービスストリームによって処理されます。

  • データストリーム StreamA は、名前、年齢、性別などのユーザーの個人情報の処理を担当します。

  • データストリーム StreamB は、メールアドレスや電話番号などのユーザーの連絡先情報の処理を担当します。

実際のビジネス運用では、ユーザーの個人情報と連絡先情報はほぼ同時に変更される可能性があります。 これらの更新が互いに上書きされることなく、ユーザー情報管理システムにすぐに反映されるようにする必要があります。

操作プロセス

  1. ユーザーが異なるプラットフォームで自分の名前と電話番号を更新します。 StreamA は名前の更新を受信し、StreamB は電話番号の更新を受信します。

  2. StreamAStreamB の両方が、ユーザー情報管理システムに更新を送信します。

最終結果

  • 部分列更新なし: StreamB の更新が StreamA の後に到着して処理された場合、StreamA によって更新されたばかりの名前情報が上書きされ ( StreamB が行全体を更新する場合 )、名前が古い値に戻ります。

  • 部分列更新あり:

    • StreamA が更新を実行する場合、連絡先情報列に影響を与えることなく、名前列のみを操作します。

    • StreamB が更新を実行する場合、個人情報列に影響を与えることなく、電話番号列のみを操作します。

    最終的な結果は、ユーザーの名前が最新の情報に更新され、電話番号も最新の情報に更新されることです。 これらの更新は干渉なしで独立して実行され、ユーザー情報の整合性と正確性が保証されます。

実際のアプリケーションでは、部分列更新機能はユーザー情報などのデータの処理に不可欠です。 この機能は、リアルタイムのデータ更新を保証するだけでなく、データの不整合の問題を効果的に防ぎます。

シナリオ 2: 行内の一部のフィールドを更新し、他のフィールドは変更しない

ユーザーデータをリアルタイムで処理および更新する必要があるユーザー情報管理システムがあるとします。 データは、異なるデータソースから情報を受信する 2 つの独立したサービスストリームによって処理されます。

  • StreamA は、名前、年齢、性別などのユーザーの個人情報と、メールアドレスや電話番号などのユーザーの連絡先情報の更新を担当します。

  • StreamB は、名前、年齢、性別などのユーザーの個人情報と、メールアドレスや電話番号などのユーザーの連絡先情報の更新を担当します。 タスクは StreamA と同じです。

操作プロセス

  1. StreamA は、INSERT INTO table (pk, age) VALUES (1, 3) ; のようなコマンドで、ユーザーの年齢のみを更新したいと考えています。

  2. 同時に、StreamB は、INSERT INTO table (pk, sex) VALUES (1, 'male') ; のようなコマンドで、ユーザーの性別のみを更新したいと考えています。

最終結果

  • 部分列更新なし: プライマリキー 1 のレコードが上記の更新コマンドを受信すると、更新されているフィールド以外のすべてのフィールドが NULL に設定されます。 これにより、元の有効なデータが失われます。

  • 動的更新あり:

    • 挿入操作がトリガーされると、システムは一部のフィールドにのみデータが含まれていることを識別します。

    • 部分列更新メカニズムにより、データを持つこれらのフィールドのみが更新されます。

    • 同時に、挿入操作でデータが提供されていないフィールドは、元の値を維持します。

この自動識別および更新戦略を実装することにより、ユーザー情報管理システムは、既存の有効なデータを失うことなく、変更する必要があるフィールドのみを正確に更新できます。 これにより、データ管理の柔軟性と正確性が大幅に向上し、データ整合性の維持に強力な保護を提供します。

Flink Connector モードの概要

Delta テーブルにおける 部分列更新 の使用シナリオに基づいて、Flink Connector 向けに 2 つの部分列更新モードが設計されており、さまざまなデータ更新要件に対応します。

静的モード

静的モードでは、ユーザーはデータストリームによって更新される列を事前に指定する必要があります。 これらの指定された列は、通常の UPSERT ロジックに従います。

  • プライマリキーが存在する場合は、データを更新します。

  • プライマリキーが存在しない場合は、新しいデータを挿入します。

同時に、更新に指定されていない列は既存の値を維持します。 このモードは、頻繁に変更されることが予想される列に適しています。

動的モード

動的モードは、システムにより高いインテリジェンスと適応性を与えます。 このモードでは、システムはデータストリーム内のどの列に NULL 以外の値が含まれているかを自動的に検出し、値を持つ列のみを更新できます。 つまり、値のない ( NULL 値を持つ) データストリームの列は変更されません。 動的モードは、どの列が変更されるかを事前に判断できない状況に特に適しており、各データストリーム更新の正確性と効率性を保証します。

これらの 2 つの更新モードを導入することにより、Flink Connector はユーザーにより柔軟で強力なデータ処理機能を提供し、実際の状況に基づいて最も適切なデータ更新戦略を選択できるようにすることで、データの正確性と整合性を保証します。

次の表は、異なるモードを使用して同じデータを更新した後の結果を示しています。

説明

この例では、最初の列 a がプライマリキーです。 静的モードでは、プライマリキー列がデフォルトで選択されます。

モード

初期データ

ステップ 1: (a, b, c) を更新した後のデータ

ステップ 2: (a, d, null) を更新した後のデータ

ステップ 3: (a, null, e) を更新した後の最終データ

標準モード

(null, null, null)

(a, b, c)

(a, d, null)

(a, null, e)

動的モード

(null, null, null)

(a, b, c)

(a, d, c)

(a, d, e)

静的モード ( 2 番目の列が更新に指定されている)

(null, null, null)

(a, b, null)

(a, d, null)

(a, null, null)

使用方法

Delta テーブルを作成し、部分列更新を有効にする

tblpropertiesacid.partial.fields.update.enable=true パラメーターを構成するという具体的な方法です。 詳細については、「Delta テーブルのパラメーター」をご参照ください。

構文例:

CREATE TABLE IF NOT EXISTS partial_upsert_test
  (pk INT NOT NULL, 
   c1 STRING, 
   c2 STRING, 
   c3 STRING, 
   primary key(pk)) 
TBLPROPERTIES('transactional'='true', 'acid.partial.fields.update.enable'='true');

Flink Connector の構成例

パラメーター

部分列更新モードを構成するために、MaxCompute は次の 2 つの構成パラメーターを導入しました。

パラメーター

説明

upsert.partial-column.enable

このパラメーターは、部分列更新機能を有効にするために使用されます。 列名が指定されていない場合 ( upsert.partial-column.name パラメーターが空のままの場合 )、システムは更新に動的モード ( NULL 以外のフィールドを更新) を使用します。

upsert.partial-column.name

このパラメーターは、更新する必要がある列を指定するために使用されます。 このパラメーターが設定されている場合、システムはリストされているフィールドのみを更新し、他のフィールドは元の値を維持します。

説明

プライマリキー列はデフォルトで選択されます。 パーティションキー列名をこのパラメーターに追加することはできません

動的部分列更新の構成例

動的部分列更新が有効になっているテーブルを作成します。 例:

CREATE TABLE partialtable (
  pk INT,
  c1 STRING, 
  c2 STRING, 
  c3 STRING,
  PRIMARY KEY(pk) NOT ENFORCED
) WITH (
  'connector' = 'maxcompute',
  'odps.end.point' = 'https://service.cn-hangzhou-vpc.maxcompute.aliyun-inc.com/api', //VPC ネットワーク接続
  'odps.project.name' = 'project_name',
  'odps.namespace.schema' = 'true', //3 層モデルをサポートします。
  'table.name' = 'project.schema.tablename',
  'sink.operation' = 'upsert',
  'upsert.write.bucket.num' = '1',
  'upsert.partial-column.enable' = 'true', 
  'odps.access.id' = 'yourAccessId',
  'odps.access.key' = 'yourAccessKey'
);

テーブルに対する後続の操作とその結果の例:

  1. 最初の列がプライマリキーであるデータ [1,a, b, c] をテーブルに挿入します。 初期データは [1, a, b, c] です。

    INSERT INTO partialtable VALUES (1, 'a', 'b', 'c'); 
  2. プライマリキー 1 のレコードの 2 番目の列 c2 のみを d に更新します。 更新後のデータは [1, a, d, c] です。

    INSERT INTO partialtable(pk, c2) VALUES (1, 'd'); 
  3. プライマリキー 1 のレコードの 3 番目の列 c3 のみを e に更新します。 更新後のデータは [1, a, d, e] です。

    INSERT INTO partialtable(pk, c3) VALUES (1, 'e'); 

静的部分列更新の構成例

c2 列のみを更新するテーブルを作成します。 このテーブルに対する後続の操作は c2 列にのみ影響し、他の列は変更されません。 例:

CREATE TABLE PartialTable2 (
  pk INT,
  c1 STRING, 
  c2 STRING, 
  c3 STRING,
  PRIMARY KEY(pk) NOT ENFORCED
) WITH (
  'connector' = 'maxcompute',
  'odps.end.point' = 'https://service.cn-hangzhou-vpc.maxcompute.aliyun-inc.com/api', //VPC ネットワーク接続
  'odps.project.name' = 'project_name',
  'odps.namespace.schema' = 'true', //3 層モデルをサポートします。
  'table.name' = 'project.schema.tablename',
  'sink.operation' = 'upsert',
  'upsert.write.bucket.num' = '1',
  'upsert.partial-column.enable' = 'true', 
  'upsert.partial-column.name' = 'c2', // c2 列のみを更新するように指定します。
  'odps.access.id' = 'yourAccessId',
  'odps.access.key' = 'yourAccessKey'
);
説明

upsert.partial-column.name パラメーターを構成する場合は、Flink 内部テーブルの列名ではなく、MaxCompute 内のテーブルに対応する列名を使用する必要があります。 これにより、Flink がストレージシステム内の対応する列を正しく識別して更新できるようになります。

関連情報