すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:プライマリキーテーブルと追加専用テーブル

最終更新日:Jan 07, 2025

Apache Paimon(Paimon)は、プライマリキーテーブルと追加専用テーブルの 2 種類のテーブルをサポートしています。このトピックでは、プライマリキーテーブルと追加専用テーブルの機能について説明します。

プライマリキーテーブル

Paimon のプライマリキーテーブルには、テーブルの作成時に指定する必要がある 1 つ以上のプライマリキーがあります。

構文

次のサンプルコードは、プライマリキーテーブルを作成する方法を示しています。この例では、パーティションキーは dt、プライマリキーは dt、shop_id、user_id に設定され、テーブルのバケット数は 4 に設定されています。

CREATE TABLE T (
  dt STRING,
  shop_id BIGINT,
  user_id BIGINT,
  num_orders INT,
  total_amount INT,
  PRIMARY KEY (dt, shop_id, user_id) NOT ENFORCED
) PARTITIONED BY (dt) WITH (
  'bucket' = '4'
);

プライマリキーは、プライマリキーテーブル内の各データレコードを識別するために使用されます。 2 つのデータレコードのプライマリキーが同じ場合、データレコードはマージエンジンの設定に基づいて 1 つにマージされます。

バケットモード

バケットは、Paimon テーブルの読み取りおよび書き込み操作の最小単位です。パーティション化されていないテーブルまたはパーティション化されたテーブルのパーティションは、バケットに分割されます。これにより、並列読み取りと書き込みが可能になり、効率が向上します。次の表に、サポートされているバケットモードを示します。

モード

定義

使用上の注意

動的バケットモード(デフォルト)

プライマリキーテーブルを作成するときに、WITH 句で bucket パラメーターを空のままにするか、'bucket' = '-1' を指定すると、動的バケットモードが使用されます。

  • このモードでは、プライマリキーテーブルは複数の Flink デプロイメントによる同時書き込みをサポートしていません。

  • このモードでは、プライマリキーテーブルはプライマリキーに基づくクロスパーティション更新をサポートしています。

固定バケットモード

プライマリキーテーブルを作成するときに、WITH 句で 'bucket' = '<num>' を指定すると、固定バケットモードが使用されます。テーブルがパーティション化されていない場合、<num> はテーブル全体で使用されるバケット数を指定します。テーブルがパーティション化されている場合、<num> は各パーティションで使用されるバケット数を指定します。<num> の値は 0 より大きい整数である必要があります。

このモードでは、バケット数を変更できます。詳細については、このトピックの「固定バケットモードでのバケット数の変更」セクションを参照してください。

このモードでは、プライマリキーに基づくクロスパーティション更新を防ぐために、プライマリキーにすべてのパーティションキーが含まれていることを確認してください。

動的バケットモードでのデータ更新

種類

使用上の注意

クロスパーティション更新

動的バケットモードを使用するプライマリキーテーブルでは、プライマリキーにすべてのパーティションキーが含まれていない場合に、クロスパーティション更新が発生します。このような場合、Paimon はプライマリキーのみでデータレコードのバケットとパーティションを判断できません。したがって、Paimon は RocksDB を使用して、プライマリキーとそれぞれのパーティションおよびバケットのマッピングを維持します。テーブルに大量のデータが含まれている場合、固定バケットモードと比較してパフォーマンスが大幅に低下する可能性があります。マッピングを RocksDB にロードする必要があるため、デプロイメントの初期化に時間がかかる場合があります。クロスパーティション更新の結果は、マージエンジンの設定によって異なります。次のマージエンジンがサポートされています。

  • deduplicate:既存のデータレコードを削除し、新しいデータレコードを指定されたパーティションに挿入します。

  • aggregation または partial-update:現在のパーティションにある既存のデータレコードを更新します。

  • first-row:既存のデータレコードを保持し、新しいデータレコードを破棄します。

パーティション内更新

動的バケットモードを使用するプライマリキーテーブルでは、プライマリキーにすべてのパーティションキーが含まれている場合に、パーティション内更新が発生します。このような場合、Paimon はプライマリキーに基づいてデータレコードのパーティションを判断できますが、対応するバケットを判断できません。したがって、Paimon はインデックスを作成して、プライマリキーとバケットのマッピングを維持します。

1 億のマッピングレコードごとに 1 GB のヒープメモリが使用されます。データが書き込まれているパーティションのみがヒープメモリを消費します。

動的バケットモードでは追加のヒープメモリが必要ですが、固定バケットモードと比較して大きなパフォーマンスの低下はありません。

バケット割り当て

モード

説明

動的バケットモード

データは最初に既存のバケットに書き込まれます。バケット数が不十分な場合は、新しいバケットが自動的に作成されます。WITH 句で次のパラメーターを使用して、このモードを設定できます。

  • dynamic-bucket.target-row-num:バケットに格納できるデータレコードの最大数。デフォルト値:2000000。

  • dynamic-bucket.initial-buckets:バケットの初期数。このパラメーターを空のままにすると、ライター演算子の並列処理が使用されます。

固定バケットモード

デフォルトでは、Paimon はデータレコードのプライマリキーに対して計算されたハッシュ値に基づいて、データレコードをバケットに割り当てます。

別の方法を使用するには、プライマリキーテーブルを作成するときに、WITH 句で bucket-key パラメーターを設定します。複数の列はコンマ(,)で区切ります。bucket-key パラメーターで指定された列がプライマリキーに含まれていることを確認してください。たとえば、'bucket-key' = 'c1,c2' を指定すると、Paimon はデータレコードの c1 列と c2 列の値に基づいてデータレコードのバケットを決定します。

固定バケットモードでのバケット数の変更

バケット数は、読み取りまたは書き込み操作の並列処理を決定します。バケット数が少なすぎると、各バケットのデータ量が大きくなり、パフォーマンスに影響します。バケット数が多すぎると、小さなファイルが多数作成されます。各バケットのデータの合計サイズを 2 GB に設定し、5 GB を超える値を指定しないことをお勧めします。固定バケットモードを使用するテーブルのバケット数を変更するには、次の手順を実行します。

  1. テーブルにデータを書き込む、またはテーブルからデータを消費するすべてのデプロイメントを一時停止します。

  2. スクリプトを作成し、次の SQL ステートメントを実行して bucket パラメーターを設定します。

    ALTER TABLE `<catalog-name>`.`<database-name>`.`<table-name>` SET ('bucket' = '<bucket-num>');
  3. 使用するパーティションのデータを再編成します。

    • パーティション化されていないテーブル:空のバッチドラフト[空のバッチドラフト] を作成し、エディターに次の SQL ステートメントを貼り付けてから、[デプロイ] と をクリックしてバッチデプロイメントを実行します。

      INSERT OVERWRITE `<catalog-name>`.`<database-name>`.`<table-name>`
      SELECT * FROM `<catalog-name>`.`<database-name>`.`<table-name>`;
    • パーティション化されたテーブル:空のバッチドラフト[空のバッチドラフト] を作成し、エディターに次の SQL ステートメントを貼り付けてから、[デプロイ] と をクリックしてバッチデプロイメントを実行します。

      INSERT OVERWRITE `<catalog-name>`.`<database-name>`.`<table-name>`
      PARTITION (<partition-spec>)
      SELECT * FROM `<catalog-name>`.`<database-name>`.`<table-name>`
      WHERE <partition-condition>;

      <partition-spec> と <partition-condition> を、再編成するパーティションに置き換えます。たとえば、dt = 20240312, hh = 08 は、dt フィールドが 20240312 で hh フィールドが 08 であるパーティションを指定します。

      INSERT OVERWRITE `<catalog-name>`.`<database-name>`.`<table-name>`
      PARTITION (dt = '20240312', hh = '08')
      SELECT * FROM `<catalog-name>`.`<database-name>`.`<table-name>`
      WHERE dt = '20240312' AND hh = '08';
  4. バッチデプロイメントが予期どおりに実行されたら、テーブルにデータを書き込む、またはテーブルからデータを消費するデプロイメントを再開します。

変更ログプロデューサー

ダウンストリームでストリーミングモードで消費できるようにするには、プライマリキーテーブルで INSERT、DELETE、および UPDATE 操作の完全な変更ログを生成する必要があります。変更ログは、データベースの binlog に似ています。変更ログを生成するために使用される方法を設定するには、WITH 句で changelog-producer パラメーターを指定します。次の表に、有効な値を示します。

有効な値

説明

シナリオ

none

プライマリキーテーブルは変更ログを生成しません。

ストリーミングモードでのデータ消費を伴わないシナリオ。

input

プライマリキーテーブルは入力レコードをダウンストリームコンシューマーに渡します。

入力データストリームにデータベースの binlog などの完全な変更ログが含まれているシナリオ。

追加の計算が不要なため、このオプションが最も効率的です。

lookup

プライマリキーテーブルは、小さなファイルのコンパクションの結果に対してルックアップを実行して、完全な変更ログを生成します。小さなファイルのコンパクションは、Flink デプロイメントの各チェックポイントでトリガーされます。

このオプションは、あらゆる種類の入力データストリームに適用できます。

full-compaction オプションと比較して、このオプションのレイテンシは低くなりますが、より多くのリソースを消費します。ビジネスで分単位のレイテンシが必要な場合は、このオプションをお勧めします。

full-compaction

プライマリキーテーブルは、小さなファイルのフルコンパクションが実行されるたびに完全な変更ログを生成します。

このオプションは、あらゆる種類の入力データストリームに適用できます。

lookup オプションと比較して、このオプションのレイテンシは高くなりますが、消費するリソースは少なくなります。このオプションは、フルコンパクションプロセスを活用して追加の計算を防ぎ、リソース消費を削減します。ビジネスで数時間までのレイテンシに対応できる場合は、このオプションをお勧めします。

データの鮮度を確保するには、WITH 句で 'full-compaction.delta-commits' = '<num>' を指定します。これにより、Paimon は特定の数(<num>)の Flink チェックポイントの後で小さなファイルのフルコンパクションを実行できます。フルコンパクションはリソースを大量に消費することに注意してください。フルコンパクションの間隔を 30 分から 1 時間の値に設定することをお勧めします。

説明

デフォルトでは、更新されたデータレコードが以前のデータレコードと同じであっても、Paimon は変更ログレコードを生成します。この問題を防ぐには、WITH 句で 'changelog-producer.row-deduplicate' = 'true' を指定します。この設定は、changelog-producer パラメーターを lookup または full-compaction に設定した場合にのみ有効です。更新前後の値の比較には追加の計算が必要になるため、変更ログに不要なレコードが多数生成される可能性がある場合にのみ、この設定を追加することをお勧めします。

マージエンジン

説明

Paimon テーブルが同じプライマリキーを持つ複数のデータレコードを受信した場合、データレコードは WITH 句で指定した merge-engine パラメーターに基づいてマージされます。有効な値:

deduplicate(デフォルト)

'merge-engine' = 'deduplicate' を指定すると、Paimon は最新のデータレコードのみを保持し、同じプライマリキーを持つ他のデータレコードを破棄します。最新のデータレコードが DELETE レコードの場合、同じプライマリキーを持つすべてのデータレコードが削除されます。この例では、次のデータ定義言語(DDL)ステートメントを使用して Paimon テーブルを作成します。

CREATE TABLE T (
  k INT,
  v1 DOUBLE,
  v2 STRING,
  PRIMARY KEY (k) NOT ENFORCED
) WITH (
  'merge-engine'='deduplicate' -- この設定はオプションです。deduplicate がデフォルト値であるため。
);
  • +I(1, 2.0, 'apple')+I(1, 4.0, 'banana')+I(1, 8.0, 'cherry') がテーブルに順番に書き込まれた場合、SELECT * FROM T WHERE k = 1 クエリの結果は (1, 8.0, 'cherry') になります。

  • +I(1, 2.0, 'apple')+I(1, 4.0, 'banana')-D(1, 4.0, 'banana') がテーブルに順番に書き込まれた場合、SELECT * FROM T WHERE k = 1 クエリではデータは取得されません。

first-row

'merge-engine' = 'first-row' を指定すると、複数のレコードのプライマリキーが同じ場合、Paimon は最初のデータレコードのみを保持します。 deduplicate エンジンと比較して、first-row エンジンは INSERT タイプの変更ログのみを生成するため、変更ログ生成の効率が向上します。

説明
  • ストリーミングモードでのデータ消費を可能にするには、changelog-producer パラメーターを lookup に設定します。

  • first-row エンジンは、DELETE および UPDATE_BEFORE の変更を処理できません。これらを無視するには、'first-row.ignore-delete' = 'true' を指定します。

  • first-row エンジンはシーケンスフィールドをサポートしていません。

この例では、次の DDL ステートメントを使用して Paimon テーブルを作成します。

CREATE TABLE T (
  k INT,
  v1 DOUBLE,
  v2 STRING,
  PRIMARY KEY (k) NOT ENFORCED
) WITH (
  'merge-engine' = 'first-row'
);

+I(1, 2.0, 'apple')+I(1, 4.0, 'banana')+I(1, 8.0, 'cherry') がテーブルに順番に書き込まれた場合、SELECT * FROM T WHERE k = 1 クエリの結果は (1, 2.0, 'apple') になります。

aggregation

Paimon は、指定された集計関数に基づいて、同じプライマリキーを持つデータレコードの各列を集計します。プライマリキーに含まれていない各列の集計関数を指定するには、fields.<field-name>.aggregate-function を使用します。それ以外の場合は、last_non_null_value 集計関数が使用されます。

説明

ストリーミングモードでのデータ消費を可能にするには、changelog-producer パラメーターを lookup または full-compaction に設定します。

次の例では、price 列は max 関数を使用して集計され、sales 列は sum 関数を使用して集計されます。

CREATE TABLE T (
  product_id BIGINT,
  price DOUBLE,
  sales BIGINT,
  PRIMARY KEY (product_id) NOT ENFORCED
) WITH (
  'merge-engine' = 'aggregation',
  'fields.price.aggregate-function' = 'max',
  'fields.sales.aggregate-function' = 'sum'
);

+I(1, 23.0, 15)+I(1, 30.2, 20) がテーブルに順番に書き込まれた場合、SELECT * FROM T WHERE product_id = 1 クエリの結果は (1, 30.2, 35) になります。

次のリストは、サポートされている集計関数とデータ型を示しています。

  • sum:DECIMAL、TINYINT、SMALLINT、INTEGER、BIGINT、FLOAT、DOUBLE をサポートします。

  • product:DECIMAL、TINYINT、SMALLINT、INTEGER、BIGINT、FLOAT、DOUBLE をサポートします。

  • count:INTEGER と BIGINT をサポートします。

  • max または min:CHAR、VARCHAR、DECIMAL、TINYINT、SMALLINT、INTEGER、BIGINT、FLOAT、DOUBLE、DATE、TIME、TIMESTAMP、TIMESTAMP_LTZ をサポートします。

  • first_value または last_value:null を含むすべてのデータ型をサポートします。

  • first_not_null_value と last_non_null_value:すべてのデータ型をサポートします。

  • listagg:STRING をサポートします。

  • bool_and と bool_or:BOOLEAN

説明

上記の集計関数では、sum、product、count のみが更新の取消(UPDATE_BEFORE および DELETE の変更)をサポートしています。列の更新の取消を無視するには、'fields.<field-name>.ignore-retract' = 'true' を指定し、<field-name> を列の名前に置き換えます。

partial-update

'merge-engine' = 'partial-update' を指定すると、同じプライマリキーを持つレコードの最新値を使用して、既存のデータレコードの列を徐々に更新できます。 null 値は既存の値を上書きしません。

説明
  • ストリーミングモードでのデータ消費を可能にするには、changelog-producer パラメーターを lookup または full-compaction に設定します。

  • partial-update エンジンは、DELETE および UPDATE_BEFORE の変更を処理できません。これらの変更を無視するには、'partial-update.ignore-delete' = 'true' を指定します。

この例では、次の DDL ステートメントを使用して Paimon テーブルを作成します。

CREATE TABLE T (
  k INT,
  v1 DOUBLE,
  v2 BIGINT,
  v3 STRING,
  PRIMARY KEY (k) NOT ENFORCED
) WITH (
  'merge-engine' = 'partial-update'
);

+I(1, 23.0, 10, NULL)+I(1, NULL, NULL, 'This is a book')+I(1, 25.2, NULL, NULL) がテーブルに順番に書き込まれた場合、SELECT * FROM T WHERE k = 1 クエリの結果は (1, 25.2, 10, 'This is a book') になります。

WITH 句でシーケンスグループと集計関数に関連するパラメーターを指定することもできます。

  • シーケンスグループを使用してマージ順序を指定する

    異なるテーブルの列を結合してワイドテーブルを作成する場合、シーケンスグループを使用して異なる列の partial-update 順序を指定し、順序が正しくないデータレコードを処理できます。

    次の例では、列 a と b は列 g_1 の値の昇順で更新され、列 c と d は列 g_2 の値の昇順で更新されます。

    CREATE TABLE T (
      k INT,
      a STRING,
      b STRING,
      g_1 INT,
      c STRING,
      d STRING,
      g_2 INT,
      PRIMARY KEY (k) NOT ENFORCED
    ) WITH (
      'merge-engine' = 'partial-update',
      'fields.g_1.sequence-group' = 'a,b',
      'fields.g_2.sequence-group' = 'c,d'
    );
  • シーケンスグループと集計関数を一緒に使用する

    WITH 句で fields.<field-name>.aggregate-function を指定して、<field-name> で指定された列に集計関数を適用することもできます。<field-name> で指定された列は、シーケンスグループに属している必要があります。 aggregation エンジンでサポートされているすべての集計関数を使用できます。

    次の例では、列 a と b は列 g_1 の値の昇順で更新されます。最終結果には、列 a の最新の null 以外の値と列 b の最大値が含まれます。列 c と d は列 g_2 の値の昇順で更新されます。最終結果には、列 c の最新の null 以外の値と列 b の値の合計が含まれます。

    CREATE TABLE T (
      k INT,
      a STRING,
      b INT,
      g_1 INT,
      c STRING,
      d INT,
      g_2 INT,
      PRIMARY KEY (k) NOT ENFORCED
    ) WITH (
      'merge-engine' = 'partial-update',
      'fields.g_1.sequence-group' = 'a,b',
      'fields.b.aggregate-function' = 'max',
      'fields.g_2.sequence-group' = 'c,d',
      'fields.d.aggregate-function' = 'sum'
    );

詳細については、「マージエンジン」を参照してください。

順序が正しくないデータの処理

デフォルトでは、Paimon は入力順序に基づいてマージ順序を決定します。最後の入力レコードが最後にマージされるレコードです。入力ストリームに順序が正しくないデータレコードが含まれている場合は、WITH 句で 'sequence.field' = '<column-name> を指定します。このようにして、同じプライマリキーを持つデータレコードは、<column-name> で指定された列の値の昇順でマージされます。 sequence.field パラメーターは、TINYINT、SMALLINT、INTEGER、BIGINT、TIMESTAMP、TIMESTAMP_LTZ のデータ型をサポートしています。

説明

MySQL コネクターを入力として使用し、op_t メタデータ列をシーケンスフィールドとして指定した場合、シーケンスフィールドの値は UPDATE_BEFORE と UPDATE_AFTER の変更のペアで同じになります。この問題を防ぐには、WITH 句で 'sequence.auto-padding' = 'row-kind-flag' を指定します。これにより、Paimon は UPDATE_AFTER の変更の前に UPDATE_BEFORE の変更を確実に処理します。

追加専用テーブル(主キーなし)

Paimon の追加専用テーブルにはプライマリキーがありません。この種類のテーブルでは、ストリーミングモードでの INSERT 操作のみが許可され、ログデータの同期など、ストリーミング更新を必要としないシナリオに適しています。

構文

次のサンプルコードは、追加専用テーブルを作成する方法を示しています。この例では、テーブルのパーティションキーは dt に設定されています。

CREATE TABLE T (
  dt STRING
  order_id BIGINT,
  item_id BIGINT,
  amount INT,
  address STRING
) PARTITIONED BY (dt) WITH (
  'bucket' = '-1'
);

サブカテゴリ

サブカテゴリ

定義

使用上の注意

追加スケーラブルテーブル

追加専用テーブルを作成するときに、WITH 句で 'bucket' = '-1' を指定すると、テーブルは追加スケーラブルテーブルになります。

この種類のテーブルは Hive テーブルに対応するもので、書き込み順序とは異なる順序でデータを消費できるシナリオに適しています。追加スケーラブルテーブルは、入力レコードのシャッフルの排除、データソートのサポート、柔軟な並列処理設定の提供、Hive テーブルからの直接変換のサポート、完全に非同期のファイルコンパクションのサポートといった方法を使用して書き込み効率を高めます。

追加キューテーブル

追加専用テーブルを作成するときに、WITH 句で 'bucket' = '<num>' を指定すると、テーブルは追加キューテーブルになります。

<num> の値は 0 より大きい整数である必要があり、パーティション化されていないテーブルで使用されるバケット数、またはパーティション化されたテーブルの各パーティションで使用されるバケット数を指定します。

この種類のテーブルは、数分間のレイテンシを持つメッセージキューサービスに対応するものです。追加キューテーブルのバケット数は、Kafka トピックのパーティション数または ApsaraMQ for MQTT インスタンスのシャード数に相当します。

バケット割り当て

テーブルの種類

説明

追加スケーラブルテーブル

データは単一のパーティションに並列で書き込まれます。バケットの概念は無視され、データの順序は維持されません。つまり、データはライターに直接プッシュされ、ハッシュパーティション分割は不要です。したがって、この種類のテーブルは高い書き込みパフォーマンスを提供します。アップストリーム演算子とライターの並列処理が同じ場合、データの偏りが発生する可能性があることに注意してください。

追加キューテーブル

デフォルトでは、Paimon はデータレコードのすべての列の値に基づいて、データレコードをバケットに割り当てます。別の方法を使用するには、追加キューテーブルを作成するときに、WITH 句で bucket-key パラメーターを設定します。複数の列はコンマ(,)で区切ります。

たとえば、'bucket-key' = 'c1,c2' を指定すると、Paimon はデータレコードの c1 列と c2 列の値に基づいてデータレコードのバケットを決定します。

説明

bucket-key パラメーターを指定することをお勧めします。これにより、バケット割り当て中の計算量が削減され、書き込み効率が向上します。

データ消費順序

テーブルの種類

説明

追加スケーラブルテーブル

この種類のテーブルは、書き込み順序とは異なる順序でデータを消費できるシナリオに適しています。

追加キューテーブル

この種類のテーブルでは、各バケットのレコードが書き込まれた順序で消費されることが保証されます。

  • 異なるパーティションからの 2 つのレコードの場合:

    • 'scan.plan-sort-partition' = 'true' を指定すると、パーティション値が小さい方のレコードが最初に消費されます。

    • それ以外の場合は、パーティションの作成時刻が早い方のレコードが最初に消費されます。

  • 同じパーティションの同じバケットからの 2 つのレコードの場合、最初に書き込まれたレコードが最初に消費されます。

  • 同じパーティションの異なるバケットからの 2 つのレコードの場合、異なるバケットは異なる Flink デプロイメントによって同時に処理される可能性があるため、消費順序は保証されません。

参照

Paimon カタログと Paimon テーブルの作成方法については、「Apache Paimon カタログの管理」を参照してください。