MaxCompute では、OSS バケットに格納された Paimon テーブルのディレクトリをマッピングする Paimon 外部テーブルを作成し、そのデータにアクセスできます。本トピックでは、Paimon 外部テーブルの作成方法および MaxCompute からのクエリ実行方法について説明します。
機能概要
Apache Paimon は、バッチおよびストリーミングワークロード向けの統合型レイクストレージフォーマットであり、高スループットな書き込みと低遅延のクエリを提供します。Spark、Hive、Trino などの一般的なコンピュートエンジンは、Realtime Compute for Apache Flink や E-MapReduce とシームレスに連携可能です。Apache Paimon を使用すると、OSS 上で迅速にデータレイクを構築し、MaxCompute と接続して分析処理を実行できます。また、メタデータフィルタリングにより、読み取り操作時に不要な OSS ディレクトリファイルをスキップすることで、クエリパフォーマンスがさらに最適化されます。
適用範囲
基盤となる Paimon ファイルのスキーマが変更されても、Paimon 外部テーブルのスキーマは自動的に更新されません。
Paimon 外部テーブルには、クラスター属性やプライマリキーを設定できません。
Paimon 外部テーブルでは、データの履歴バージョンをクエリすることはできません。
Paimon 外部テーブルに対して直接データを書き込まないでください。代わりに、UNLOAD などの方法を使用してデータを OSS へエクスポートしてください。
INSERT INTO 文または INSERT OVERWRITE 文を使用して、Paimon 外部テーブルへデータを書き込むことができます。ただし、Dynamic Bucket テーブルおよび Cross Partition テーブルへの書き込みはサポートされていません。
Paimon 外部テーブルでは、UPDATE 操作および DELETE 操作はサポートされていません。
MaxCompute と OSS は、同一リージョン内に配置する必要があります。
Paimon 外部テーブルの作成
構文構造
さまざまな形式の外部テーブルの構文の詳細については、「OSS 外部テーブル」をご参照ください。
CREATE EXTERNAL TABLE [IF NOT EXISTS] <mc_oss_extable_name>
(
<col_name> <data_type>,
...
)
[COMMENT <table_comment>]
[PARTITIONED BY (<col_name> <data_type>, ...)]
STORED BY 'org.apache.paimon.hive.PaimonStorageHandler'
WITH serdeproperties (
'odps.properties.rolearn'='acs:ram::<uid>:role/aliyunodpsdefaultrole'
)
LOCATION '<oss_location>';共通パラメーター
共通パラメーターの詳細については、「基本構文パラメーター」をご参照ください。
データの書き込み
MaxCompute の書き込み構文の詳細については、「書き込み構文」をご参照ください。
クエリ分析
Paimon テーブルのデータ分割(Split)ロジックは、ネイティブな MaxCompute テーブルとは異なります。Paimon は独自の内部ファイル構成およびシャーディング機構を使用しており、これは MaxCompute のパラメーターと完全には一致しません。
SELECT 構文の詳細については、「クエリ構文の説明」をご参照ください。
クエリプランの最適化に関する詳細については、「クエリ最適化」をご参照ください。
BadRowSkipping の詳細については、「BadRowSkipping」をご参照ください。
使用例
ステップ 1:前提条件
すでにMaxCompute プロジェクトを作成済みです。
OSS バケットおよび OSS ディレクトリを準備済みである必要があります。詳細については、「バケットの作成」および「フォルダの管理」をご参照ください。
MaxCompute は特定のリージョンでのみ利用可能であるため、クロスリージョン接続の問題が発生する可能性があります。そのため、MaxCompute プロジェクトと同じリージョンにある OSS バケットの使用を推奨します。
権限付与
OSS へのアクセス権限が必要です。Alibaba Cloud アカウント(ルートユーザー)、Resource Access Management (RAM) ユーザー、または RAM ロールを使用して、OSS 外部テーブルにアクセスできます。権限付与の詳細については、「OSS に対する STS モードによる権限付与」をご参照ください。
MaxCompute プロジェクトで CreateTable 権限を持っている必要があります。テーブル操作権限の詳細については、「MaxCompute 権限」をご参照ください。
ステップ 2:Flink におけるデータの準備
Paimon カタログおよび Paimon テーブルを作成し、以下の例に示すようにテーブルにデータを挿入します。
OSS 内に既に Paimon テーブルのデータが存在する場合は、このステップをスキップしてください。
Paimon Filesystem カタログの作成
Flink コンソールにログインし、左上隅からリージョンを選択します。
対象のワークスペース名をクリックし、左側のナビゲーションウィンドウで Catalogs を選択します。
Catalog List ページで、右側の Create Catalog をクリックします。Create Catalog ダイアログボックスで Apache Paimon を選択し、Next をクリックして、以下のパラメーターを設定します:
パラメーター
必須
説明
metastore
必須。
メタストアのタイプです。本例では、
filesystemを選択します。catalog name
必須
任意のカタログ名(例:
paimon-catalog)。warehouse
必須
OSS 内のデータウェアハウスディレクトリです。本例では、
oss://paimon-fs/paimon-test/を使用します。fs.oss.endpoint
必須
OSS エンドポイントです。たとえば、中国 (杭州) リージョンのエンドポイントは
oss-cn-hangzhou-internal.aliyuncs.comです。fs.oss.accessKeyId
必須
OSS へのアクセスに必要な AccessKey ID です。
fs.oss.accessKeySecret
必須
OSS へのアクセスに必要な AccessKey Secret です。
Paimon テーブルの作成
Flink コンソールにログインし、左上隅からリージョンを選択します。
対象のワークスペース名をクリックし、左側のナビゲーションウィンドウで を選択します。
New Script タブで、
をクリックして新しいクエリスクリプトを作成できます。以下のコマンドを入力し、[実行] をクリックします。
CREATE TABLE `paimon_catalog`.`default`.test_tbl ( id BIGINT, data STRING, dt STRING, PRIMARY KEY (dt, id) NOT ENFORCED ) PARTITIONED BY (dt); INSERT INTO `paimon-catalog`.`default`.test_tbl VALUES (1,'CCC','2024-07-18'), (2,'DDD','2024-07-18');
SQL ジョブがレート制限されている場合(例:
INSERT INTO ... VALUES ...文の実行時)は、以下の手順を実行します:対象のワークスペース名をクリックし、左側のナビゲーションウィンドウで を選択します。
Deployments ページで、対象ジョブの名前をクリックして、その Configuration ページを開きます。
ジョブ実行時パラメーターの設定方法の詳細については、「ジョブデプロイメント情報の設定」をご参照ください。
ステップ 3:MaxCompute における Paimon 外部テーブルの作成
以下の SQL コードを MaxCompute で実行して、Paimon 外部テーブルを作成します。
CREATE EXTERNAL TABLE oss_extable_paimon_pt
(
id BIGINT,
data STRING
)
PARTITIONED BY (dt STRING )
STORED BY 'org.apache.paimon.hive.PaimonStorageHandler'
WITH serdeproperties (
'odps.properties.rolearn'='acs:ram::<uid>:role/aliyunodpsdefaultrole'
)
LOCATION 'oss://oss-cn-<your region>-internal.aliyuncs.com/<table_path>'
;上記コードにおいて、table_path は、Flink で作成した Paimon テーブルのパス(例:paimon-fs/paimon-test/default.db/test_tbl)です。このパスを取得するには、以下の手順を実行します:
Flink コンソールにログインし、左上隅からリージョンを選択します。
対象のワークスペース名をクリックし、左側のナビゲーションウィンドウで Catalogs を選択します。
ステップ 4:パーティションデータの読み込み
作成した OSS 外部テーブルがパーティション化されている場合、パーティションデータを個別に読み込む必要があります。詳細については、「OSS 外部テーブルのパーティションデータ読み込み構文」をご参照ください。
MSCK REPAIR TABLE oss_extable_paimon_pt ADD PARTITIONS;ステップ 5:MaxCompute からの Paimon 外部テーブルの読み取り
以下のコマンドを MaxCompute で実行して、Paimon 外部テーブル oss_extable_paimon_pt をクエリします。
SET odps.sql.common.table.planner.ext.hive.bridge = true;
SET odps.sql.hive.compatible = true;
SELECT * FROM oss_extable_paimon_pt WHERE dt='2024-07-18';結果は以下のとおりです:
+------------+------------+------------+
| id | data | dt |
+------------+------------+------------+
| 1 | CCC | 2024-07-18 |
| 2 | DDD | 2024-07-18 |
+------------+------------+------------+Paimon ファイルのスキーマと外部テーブルのスキーマが異なる場合:
列数の不一致: Paimon ファイルの列数が外部テーブル DDL で定義された列数より少ない場合、読み取り時に不足分の列値は NULL で埋められます。逆に、列数が多すぎる場合は、余分な列は破棄されます。
データの型の不一致: MaxCompute では、Paimon ファイルの STRING データを INT 列として読み取ることはサポートされていません。一方、INT データを STRING 列として読み取ることは可能ですが、推奨されません。
サポートされるデータの型
MaxCompute のデータの型については、「データの型(第 1 版)」および「データの型(第 2 版)」をご参照ください。
オープンソース Paimon のデータの型 | MaxCompute 2.0 のデータの型 | 読み取り/書き込みのサポート状況 | 説明 |
TINYINT | TINYINT | 8 ビット符号付き整数。 | |
SMALLINT | SMALLINT | 16 ビット符号付き整数。 | |
INT | INT | 32 ビット符号付き整数。 | |
BIGINT | BIGINT | 64 ビット符号付き整数。 | |
BINARY(MAX_LENGTH) | BINARY | バイナリデータ型。最大長は 8 MB です。 | |
FLOAT | FLOAT | 32 ビットバイナリ浮動小数点数。 | |
DOUBLE | DOUBLE | 64 ビットバイナリ浮動小数点数。 | |
DECIMAL(precision,scale) | DECIMAL(precision,scale) | 正確な 10 進数。デフォルトは
| |
VARCHAR(n) | VARCHAR(n) | 可変長文字列。n は長さで、1~65535 の範囲です。 | |
CHAR(n) | CHAR(n) | 固定長文字列。n は長さで、1~255 の範囲です。 | |
VARCHAR(MAX_LENGTH) | STRING | 文字列型。最大長は 8 MB です。 | |
DATE | DATE | 日付フォーマット: | |
TIME, TIME(p) | サポートされていません | Paimon の TIME 型はタイムゾーンなしの時刻を表し、時間・分・秒から構成され、ナノ秒精度です。 TIME(p) は小数点以下の秒の精度(0~9)を指定します(デフォルトは 0)。 MaxCompute には対応するデータの型がありません。 | |
TIMESTAMP, TIMESTAMP(p) | TIMESTAMP_NTZ | タイムゾーンなしのタイムスタンプで、ナノ秒精度です。 この型を読み取るには、ネイティブモードスイッチを有効化します: | |
TIMESTAMP WITH LOCAL TIME_ZONE(9) | TIMESTAMP |
| |
TIMESTAMP WITH LOCAL TIME_ZONE(9) | DATETIME | ナノ秒精度のタイムスタンプ型です。 フォーマット: | |
BOOLEAN | BOOLEAN | ブール値型。 | |
ARRAY | ARRAY | 複合型。 | |
MAP | MAP | 複合型。 | |
ROW | STRUCT | 複合型。 | |
MULTISET<t> | サポートされていません | MaxCompute には対応するデータの型がありません。 | |
VARBINARY, VARBINARY(n), BYTES | BINARY | 可変長バイナリ文字列。 |
よくある質問
Paimon 外部テーブルの読み取り時に kSIGABRT エラーが発生する
エラーメッセージ:
ODPS-0123144: Fuxi ジョブが失敗しました - kSIGABRT(errCode:6) at Odps/*****_SQL_0_1_0_job_0/M1@f01b17437.cloud.eo166#3. 詳細なエラーメッセージ:CRASH_CORE、JVM クラッシュが原因である可能性があります。Java UDF/UDAF/UDTF を確認してください。 | fatalInstance: Odps/*****_SQL_0_1_0_job_0/M1#0_0原因:
JNI モードで TIMESTAMP_NTZ データを読み取ろうとした際に発生します。
解決策:
テーブルから読み取る前に、
SET odps.sql.common.table.jni.disable.native=true;コマンドを実行してネイティブ機能を無効化します。
参考
Flink でカスタムカタログとして MaxCompute の Paimon 外部テーブルを作成し、そこにデータを書き込んだ後、MaxCompute から Paimon データをクエリおよび利用できます。詳細については、「Flink を基にした MaxCompute Paimon 外部テーブルの作成」をご参照ください。