このトピックでは、Apache Iceberg コネクタの使用方法について説明します。
背景情報
Apache Iceberg は、データレイク向けのオープンなテーブル形式です。 Apache Iceberg を使用すると、Hadoop 分散ファイルシステム ( HDFS ) または Alibaba Cloud Object Storage Service ( OSS ) 上に独自のデータレイクストレージサービスを迅速に構築できます。 その後、Apache Flink、Apache Spark、Apache Hive、Apache Presto などのオープンソースビッグデータエコシステムのコンピューティングエンジンを使用して、データレイク内のデータを分析できます。 次の表に、Apache Iceberg コネクタの概要を示します。
項目 | 説明 |
テーブルタイプ | ソーステーブルとシンクテーブル |
実行モード | バッチモードとストリーミングモード |
データ形式 | 該当なし |
メトリック | 該当なし |
API タイプ | SQL API |
シンクテーブルでのデータの更新または削除 | サポートされています |
機能
Apache Iceberg コネクタは、次のコア機能を提供します。
HDFS または OSS に基づいて、低コストの軽量データレイクストレージサービスを構築します。
包括的な原子性、一貫性、分離性、耐久性 ( ACID ) セマンティクスを提供します。
履歴バージョンのバックトラッキングをサポートします。
効率的なデータフィルタリングをサポートします。
スキーマ進化をサポートします。
パーティション進化をサポートします。
セルフマネージド Hive メタストアへのデータストレージをサポートします。 詳細については、「Apache Iceberg ソーステーブルの作成と使用」をご参照ください。
Flink の強力なフォールトトレランスとストリーム処理機能を使用して、ログ内の大量の行動データをリアルタイムで Apache Iceberg データレイクにインポートできます。 その後、Flink または別の分析エンジンを使用して、データの値を抽出できます。
制限事項
Ververica Runtime ( VVR ) 4.0.8 以降を使用する Apache Flink 用 Realtime Compute のみ、Apache Iceberg コネクタをサポートしています。 Apache Iceberg コネクタは、Data Lake Formation ( DLF ) カタログと一緒に使用する必要があります。 詳細については、「DLF カタログの管理」をご参照ください。
Apache Iceberg コネクタは、バージョン 1 とバージョン 2 の Apache Iceberg テーブル形式をサポートしています。 詳細については、「Iceberg Table Spec」をご参照ください。
説明VVR 8.0.7 以降を使用する Apache Flink 用 Realtime Compute のみ、バージョン 2 の Apache Iceberg テーブル形式をサポートしています。
ストリーミング読み取りモードが有効になっている場合、Append Only モードでデータが書き込まれた Apache Iceberg テーブルのみをソーステーブルとして使用できます。
構文
CREATE TABLE iceberg_table (
id BIGINT,
data STRING
PRIMARY KEY(`id`) NOT ENFORCED
)
PARTITIONED BY (data)
WITH (
'connector' = 'iceberg',
...
);WITH 句のコネクタオプション
共通オプションとソーステーブルオプション
オプション | 説明 | データ型 | 必須 | デフォルト値 | 備考 |
connector | 使用するコネクタ。 | STRING | はい | デフォルト値なし | 値を |
catalog-name | カタログの名前。 | STRING | はい | デフォルト値なし | カタログの名前を入力します。 |
catalog-database | データベースの名前。 | STRING | はい | default | Data Lake Formation ( DLF ) で作成されたデータベースの名前に設定します。 例: dlf_db。 説明 DLF データベースを作成していない場合は、作成してください。 |
io-impl | 分散ファイルシステムの実装クラスの名前。 | STRING | はい | デフォルト値なし | 値を |
oss.endpoint | OSS バケットのエンドポイント。 | STRING | いいえ | デフォルト値なし | 詳細については、「リージョンとエンドポイント」をご参照ください。 説明
|
| Alibaba Cloud アカウントの AccessKey ID。 | STRING | はい | デフォルト値なし | 詳細については、「アカウントの AccessKey ペアを表示するにはどうすればよいですか。」をご参照ください。 重要 資格情報のセキュリティを向上させるには、プレーンテキストで AccessKey ペアをハードコーディングすることは避け、代わりに変数を使用してください。詳細については、「キーの管理」をご参照ください。 |
| Alibaba Cloud アカウントの AccessKey シークレット。 | STRING | はい | デフォルト値なし | |
catalog-impl | カタログのクラス名。 | STRING | はい | デフォルト値なし | 値を org.apache.iceberg.aliyun.dlf.DlfCatalog に設定します。 |
warehouse | テーブルデータが格納される OSS ディレクトリ。 | STRING | はい | デフォルト値なし | 該当なし。 |
dlf.catalog-id | Alibaba Cloud アカウントの ID。 | STRING | はい | デフォルト値なし | セキュリティ設定 ページにアクセスして、アカウント ID を取得できます。 |
dlf.endpoint | DLF のエンドポイント。 | STRING | はい | デフォルト値なし | 説明
|
dlf.region-id | DLF サービスが有効になっているリージョンの名前。 | STRING | はい | デフォルト値なし | 説明 選択したリージョンが dlf.endpoint に選択したエンドポイントと一致していることを確認してください。 |
uri | Hive メタストアの thrift URI。 | STRING | いいえ | デフォルト値なし | セルフマネージド Hive メタストアで使用されます。 説明 このオプションは、Hive カタログを使用する場合にのみ必要です。 |
シンク専用オプション
オプション | 説明 | データ型 | 必須 | デフォルト値 | 備考 |
write.operation | 書き込み操作モード。 | STRING | いいえ | upsert | 有効な値:
|
hive_sync.enable | メタデータを Hive に同期するかどうかを指定します。 | BOOLEAN | いいえ | false | 有効な値:
|
hive_sync.mode | Hive データ同期モード。 | STRING | いいえ | hms |
|
hive_sync.db | データを同期する Hive データベースの名前。 | STRING | いいえ | カタログ内の現在のテーブルのデータベース名 | 該当なし。 |
hive_sync.table | データを同期する Hive テーブルの名前。 | STRING | いいえ | 現在のテーブルの名前 | 該当なし。 |
dlf.catalog.region | DLF サービスが有効になっているリージョンの名前。 | STRING | いいえ | デフォルト値なし | 説明
|
dlf.catalog.endpoint | DLF のエンドポイント。 | STRING | いいえ | デフォルト値なし | 説明
|
データ型マッピング
Apache Iceberg のデータ型 | Apache Flink 用 Realtime Compute のデータ型 |
BOOLEAN | BOOLEAN |
INT | INT |
LONG | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL(P,S) | DECIMAL(P,S) |
DATE | DATE |
TIME | TIME 説明 Apache Iceberg のタイムスタンプはマイクロ秒単位の精度であり、Apache Flink 用 Realtime Compute のタイムスタンプはミリ秒単位の精度です。 Apache Flink 用 Realtime Compute を使用して Apache Iceberg データを読み取ると、時間の精度はミリ秒に調整されます。 |
TIMESTAMP | TIMESTAMP |
TIMESTAMPTZ | TIMESTAMP_LTZ |
STRING | STRING |
FIXED(L) | BYTES |
BINARY | VARBINARY |
STRUCT<...> | ROW |
LIST<E> | LIST |
MAP<K,V> | MAP |
サンプルコード
OSS バケットと DLF データベースが作成されていることを確認します。 詳細については、「バケットの作成」および「データベーステーブルと関数」をご参照ください。
DLF データベースの [ディレクトリ] を指定する場合は、${warehouse}/${database_name}.db 形式のディレクトリを入力することをお勧めします。 たとえば、warehouse オプションの値が oss://iceberg-test/warehouse で、database_name オプションの値が dlf_db の場合、dlf_db データベースの OSS ディレクトリを oss://iceberg-test/warehouse/dlf_db.db に設定します。
Apache Iceberg シンクテーブルの作成と使用
Datagen コネクタを使用してランダムなストリーミングデータを生成し、Apache Iceberg に書き込みます。
CREATE TEMPORARY TABLE datagen(
id BIGINT,
data STRING
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE dlf_iceberg (
id BIGINT,
data STRING
) WITH (
'connector' = 'iceberg',
'catalog-name' = '<yourCatalogName>',
'catalog-database' = '<yourDatabaseName>',
'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO',
'oss.endpoint' = '<yourOSSEndpoint>',
'access.key.id' = '${secret_values.ak_id}',
'access.key.secret' = '${secret_values.ak_secret}',
'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog',
'warehouse' = '<yourOSSWarehousePath>',
'dlf.catalog-id' = '<yourCatalogId>',
'dlf.endpoint' = '<yourDLFEndpoint>',
'dlf.region-id' = '<yourDLFRegionId>'
);
INSERT INTO dlf_iceberg SELECT * FROM datagen;Apache Iceberg ソーステーブルの作成と使用
Hive カタログで管理され、セルフマネージド Hive メタストアに格納されている Iceberg テーブルにマッピングする Flink テーブルを作成します。
説明Flink ワークスペースとセルフマネージド Hive メタストアクラスター間の接続が作成されていることを確認してください。
データファイルは
oss://<bucket>/<path>/<database-name>/flink_tableディレクトリに格納されます。
CREATE TEMOPORY TABLE flink_table ( id BIGINT, data STRING ) WITH ( 'connector'='iceberg', 'catalog-name'='<yourCatalogName>', 'catalog-database'='<yourDatabaseName>', 'uri'='thrift://<ip>:<post>', 'warehouse'='oss://<bucket>/<path>', 'io-impl'='org.apache.iceberg.aliyun.oss.OSSFileIO', 'access-key-id'='<yourAccessKeyID>', 'access-key-secret'='<yourAccessKeySecret>', 'oss.endpoint'='<yourOSSEndpoint>' );DLF カタログで管理されている Iceberg テーブルにマッピングする 2 つの Flink テーブルを作成し、1 つの Flink テーブルからデータを読み取ってもう 1 つのテーブルに書き込みます。
CREATE TEMPORARY TABLE src_iceberg ( id BIGINT, data STRING ) WITH ( 'connector' = 'iceberg', 'catalog-name' = '<yourCatalogName>', 'catalog-database' = '<yourDatabaseName>', 'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO', 'oss.endpoint' = '<yourOSSEndpoint>', 'access.key.id' = '${secret_values.ak_id}', 'access.key.secret' = '${secret_values.ak_secret}', 'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog', 'warehouse' = '<yourOSSWarehousePath>', 'dlf.catalog-id' = '<yourCatalogId>', 'dlf.endpoint' = '<yourDLFEndpoint>', 'dlf.region-id' = '<yourDLFRegionId>' ); CREATE TEMPORARY TABLE dst_iceberg ( id BIGINT, data STRING ) WITH ( 'connector' = 'iceberg', 'catalog-name' = '<yourCatalogName>', 'catalog-database' = '<yourDatabaseName>', 'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO', 'oss.endpoint' = '<yourOSSEndpoint>', 'access.key.id' = '${secret_values.ak_id}', 'access.key.secret' = '${secret_values.ak_secret}', 'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog', 'warehouse' = '<yourOSSWarehousePath>', 'dlf.catalog-id' = '<yourCatalogId>', 'dlf.endpoint' = '<yourDLFEndpoint>', 'dlf.region-id' = '<yourDLFRegionId>' ); BEGIN STATEMENT SET; INSERT INTO src_iceberg VALUES (1, 'AAA'), (2, 'BBB'), (3, 'CCC'), (4, 'DDD'), (5, 'EEE'); INSERT INTO dst_iceberg SELECT * FROM src_iceberg; END;
参照
Apache Flink 用 Realtime Compute でサポートされているコネクタの詳細については、「サポートされているコネクタ」をご参照ください。