Apache Paimon(Paimon)は、プライマリキーテーブルと追加専用テーブルの 2 種類のテーブルをサポートしています。このトピックでは、プライマリキーテーブルと追加専用テーブルの機能について説明します。
プライマリキーテーブル
Paimon のプライマリキーテーブルには、テーブルの作成時に指定する必要がある 1 つ以上のプライマリキーがあります。
構文
次のサンプルコードは、プライマリキーテーブルを作成する方法を示しています。この例では、パーティションキーは dt、プライマリキーは dt、shop_id、user_id に設定され、テーブルのバケット数は 4 に設定されています。
CREATE TABLE T (
dt STRING,
shop_id BIGINT,
user_id BIGINT,
num_orders INT,
total_amount INT,
PRIMARY KEY (dt, shop_id, user_id) NOT ENFORCED
) PARTITIONED BY (dt) WITH (
'bucket' = '4'
);プライマリキーは、プライマリキーテーブル内の各データレコードを識別するために使用されます。 2 つのデータレコードのプライマリキーが同じ場合、データレコードはマージエンジンの設定に基づいて 1 つにマージされます。
バケットモード
バケットは、Paimon テーブルの読み取りおよび書き込み操作の最小単位です。パーティション化されていないテーブルまたはパーティション化されたテーブルのパーティションは、バケットに分割されます。これにより、並列読み取りと書き込みが可能になり、効率が向上します。次の表に、サポートされているバケットモードを示します。
モード | 定義 | 使用上の注意 |
動的バケットモード(デフォルト) | プライマリキーテーブルを作成するときに、WITH 句で |
|
固定バケットモード | プライマリキーテーブルを作成するときに、WITH 句で このモードでは、バケット数を変更できます。詳細については、このトピックの「固定バケットモードでのバケット数の変更」セクションを参照してください。 | このモードでは、プライマリキーに基づくクロスパーティション更新を防ぐために、プライマリキーにすべてのパーティションキーが含まれていることを確認してください。 |
動的バケットモードでのデータ更新
種類 | 使用上の注意 |
クロスパーティション更新 | 動的バケットモードを使用するプライマリキーテーブルでは、プライマリキーにすべてのパーティションキーが含まれていない場合に、クロスパーティション更新が発生します。このような場合、Paimon はプライマリキーのみでデータレコードのバケットとパーティションを判断できません。したがって、Paimon は RocksDB を使用して、プライマリキーとそれぞれのパーティションおよびバケットのマッピングを維持します。テーブルに大量のデータが含まれている場合、固定バケットモードと比較してパフォーマンスが大幅に低下する可能性があります。マッピングを RocksDB にロードする必要があるため、デプロイメントの初期化に時間がかかる場合があります。クロスパーティション更新の結果は、マージエンジンの設定によって異なります。次のマージエンジンがサポートされています。
|
パーティション内更新 | 動的バケットモードを使用するプライマリキーテーブルでは、プライマリキーにすべてのパーティションキーが含まれている場合に、パーティション内更新が発生します。このような場合、Paimon はプライマリキーに基づいてデータレコードのパーティションを判断できますが、対応するバケットを判断できません。したがって、Paimon はインデックスを作成して、プライマリキーとバケットのマッピングを維持します。 1 億のマッピングレコードごとに 1 GB のヒープメモリが使用されます。データが書き込まれているパーティションのみがヒープメモリを消費します。 動的バケットモードでは追加のヒープメモリが必要ですが、固定バケットモードと比較して大きなパフォーマンスの低下はありません。 |
バケット割り当て
モード | 説明 |
動的バケットモード | データは最初に既存のバケットに書き込まれます。バケット数が不十分な場合は、新しいバケットが自動的に作成されます。WITH 句で次のパラメーターを使用して、このモードを設定できます。
|
固定バケットモード | デフォルトでは、Paimon はデータレコードのプライマリキーに対して計算されたハッシュ値に基づいて、データレコードをバケットに割り当てます。 別の方法を使用するには、プライマリキーテーブルを作成するときに、WITH 句で |
固定バケットモードでのバケット数の変更
バケット数は、読み取りまたは書き込み操作の並列処理を決定します。バケット数が少なすぎると、各バケットのデータ量が大きくなり、パフォーマンスに影響します。バケット数が多すぎると、小さなファイルが多数作成されます。各バケットのデータの合計サイズを 2 GB に設定し、5 GB を超える値を指定しないことをお勧めします。固定バケットモードを使用するテーブルのバケット数を変更するには、次の手順を実行します。
テーブルにデータを書き込む、またはテーブルからデータを消費するすべてのデプロイメントを一時停止します。
スクリプトを作成し、次の SQL ステートメントを実行して bucket パラメーターを設定します。
ALTER TABLE `<catalog-name>`.`<database-name>`.`<table-name>` SET ('bucket' = '<bucket-num>');使用するパーティションのデータを再編成します。
パーティション化されていないテーブル:空のバッチドラフト[空のバッチドラフト] を作成し、エディターに次の SQL ステートメントを貼り付けてから、[デプロイ] と をクリックしてバッチデプロイメントを実行します。
INSERT OVERWRITE `<catalog-name>`.`<database-name>`.`<table-name>` SELECT * FROM `<catalog-name>`.`<database-name>`.`<table-name>`;パーティション化されたテーブル:空のバッチドラフト[空のバッチドラフト] を作成し、エディターに次の SQL ステートメントを貼り付けてから、[デプロイ] と をクリックしてバッチデプロイメントを実行します。
INSERT OVERWRITE `<catalog-name>`.`<database-name>`.`<table-name>` PARTITION (<partition-spec>) SELECT * FROM `<catalog-name>`.`<database-name>`.`<table-name>` WHERE <partition-condition>;<partition-spec> と <partition-condition> を、再編成するパーティションに置き換えます。たとえば、
dt = 20240312, hh = 08は、dt フィールドが 20240312 で hh フィールドが 08 であるパーティションを指定します。INSERT OVERWRITE `<catalog-name>`.`<database-name>`.`<table-name>` PARTITION (dt = '20240312', hh = '08') SELECT * FROM `<catalog-name>`.`<database-name>`.`<table-name>` WHERE dt = '20240312' AND hh = '08';
バッチデプロイメントが予期どおりに実行されたら、テーブルにデータを書き込む、またはテーブルからデータを消費するデプロイメントを再開します。
変更ログプロデューサー
ダウンストリームでストリーミングモードで消費できるようにするには、プライマリキーテーブルで INSERT、DELETE、および UPDATE 操作の完全な変更ログを生成する必要があります。変更ログは、データベースの binlog に似ています。変更ログを生成するために使用される方法を設定するには、WITH 句で changelog-producer パラメーターを指定します。次の表に、有効な値を示します。
有効な値 | 説明 | シナリオ |
none | プライマリキーテーブルは変更ログを生成しません。 | ストリーミングモードでのデータ消費を伴わないシナリオ。 |
input | プライマリキーテーブルは入力レコードをダウンストリームコンシューマーに渡します。 | 入力データストリームにデータベースの binlog などの完全な変更ログが含まれているシナリオ。 追加の計算が不要なため、このオプションが最も効率的です。 |
lookup | プライマリキーテーブルは、小さなファイルのコンパクションの結果に対してルックアップを実行して、完全な変更ログを生成します。小さなファイルのコンパクションは、Flink デプロイメントの各チェックポイントでトリガーされます。 | このオプションは、あらゆる種類の入力データストリームに適用できます。 full-compaction オプションと比較して、このオプションのレイテンシは低くなりますが、より多くのリソースを消費します。ビジネスで分単位のレイテンシが必要な場合は、このオプションをお勧めします。 |
full-compaction | プライマリキーテーブルは、小さなファイルのフルコンパクションが実行されるたびに完全な変更ログを生成します。 | このオプションは、あらゆる種類の入力データストリームに適用できます。 lookup オプションと比較して、このオプションのレイテンシは高くなりますが、消費するリソースは少なくなります。このオプションは、フルコンパクションプロセスを活用して追加の計算を防ぎ、リソース消費を削減します。ビジネスで数時間までのレイテンシに対応できる場合は、このオプションをお勧めします。 データの鮮度を確保するには、WITH 句で |
デフォルトでは、更新されたデータレコードが以前のデータレコードと同じであっても、Paimon は変更ログレコードを生成します。この問題を防ぐには、WITH 句で 'changelog-producer.row-deduplicate' = 'true' を指定します。この設定は、changelog-producer パラメーターを lookup または full-compaction に設定した場合にのみ有効です。更新前後の値の比較には追加の計算が必要になるため、変更ログに不要なレコードが多数生成される可能性がある場合にのみ、この設定を追加することをお勧めします。
マージエンジン
説明
Paimon テーブルが同じプライマリキーを持つ複数のデータレコードを受信した場合、データレコードは WITH 句で指定した merge-engine パラメーターに基づいてマージされます。有効な値:
first-row
aggregation
partial-update
詳細については、「マージエンジン」を参照してください。
順序が正しくないデータの処理
デフォルトでは、Paimon は入力順序に基づいてマージ順序を決定します。最後の入力レコードが最後にマージされるレコードです。入力ストリームに順序が正しくないデータレコードが含まれている場合は、WITH 句で 'sequence.field' = '<column-name> を指定します。このようにして、同じプライマリキーを持つデータレコードは、<column-name> で指定された列の値の昇順でマージされます。 sequence.field パラメーターは、TINYINT、SMALLINT、INTEGER、BIGINT、TIMESTAMP、TIMESTAMP_LTZ のデータ型をサポートしています。
MySQL コネクターを入力として使用し、op_t メタデータ列をシーケンスフィールドとして指定した場合、シーケンスフィールドの値は UPDATE_BEFORE と UPDATE_AFTER の変更のペアで同じになります。この問題を防ぐには、WITH 句で 'sequence.auto-padding' = 'row-kind-flag' を指定します。これにより、Paimon は UPDATE_AFTER の変更の前に UPDATE_BEFORE の変更を確実に処理します。
追加専用テーブル(主キーなし)
Paimon の追加専用テーブルにはプライマリキーがありません。この種類のテーブルでは、ストリーミングモードでの INSERT 操作のみが許可され、ログデータの同期など、ストリーミング更新を必要としないシナリオに適しています。
構文
次のサンプルコードは、追加専用テーブルを作成する方法を示しています。この例では、テーブルのパーティションキーは dt に設定されています。
CREATE TABLE T (
dt STRING
order_id BIGINT,
item_id BIGINT,
amount INT,
address STRING
) PARTITIONED BY (dt) WITH (
'bucket' = '-1'
);サブカテゴリ
サブカテゴリ | 定義 | 使用上の注意 |
追加スケーラブルテーブル | 追加専用テーブルを作成するときに、WITH 句で | この種類のテーブルは Hive テーブルに対応するもので、書き込み順序とは異なる順序でデータを消費できるシナリオに適しています。追加スケーラブルテーブルは、入力レコードのシャッフルの排除、データソートのサポート、柔軟な並列処理設定の提供、Hive テーブルからの直接変換のサポート、完全に非同期のファイルコンパクションのサポートといった方法を使用して書き込み効率を高めます。 |
追加キューテーブル | 追加専用テーブルを作成するときに、WITH 句で
| この種類のテーブルは、数分間のレイテンシを持つメッセージキューサービスに対応するものです。追加キューテーブルのバケット数は、Kafka トピックのパーティション数または ApsaraMQ for MQTT インスタンスのシャード数に相当します。 |
バケット割り当て
テーブルの種類 | 説明 |
追加スケーラブルテーブル | データは単一のパーティションに並列で書き込まれます。バケットの概念は無視され、データの順序は維持されません。つまり、データはライターに直接プッシュされ、ハッシュパーティション分割は不要です。したがって、この種類のテーブルは高い書き込みパフォーマンスを提供します。アップストリーム演算子とライターの並列処理が同じ場合、データの偏りが発生する可能性があることに注意してください。 |
追加キューテーブル | デフォルトでは、Paimon はデータレコードのすべての列の値に基づいて、データレコードをバケットに割り当てます。別の方法を使用するには、追加キューテーブルを作成するときに、WITH 句で たとえば、 説明
|
データ消費順序
テーブルの種類 | 説明 |
追加スケーラブルテーブル | この種類のテーブルは、書き込み順序とは異なる順序でデータを消費できるシナリオに適しています。 |
追加キューテーブル | この種類のテーブルでは、各バケットのレコードが書き込まれた順序で消費されることが保証されます。
|
参照
Paimon カタログと Paimon テーブルの作成方法については、「Apache Paimon カタログの管理」を参照してください。
Paimon カタログと Paimon テーブルの作成方法については、「Apache Paimon カタログの管理」を参照してください。
Paimon のプライマリキーテーブルのパフォーマンスを最適化する方法については、「パフォーマンスの最適化」を参照してください。