このトピックでは、Iceberg コネクタの使用方法について説明します。
背景情報
Apache Iceberg は、データレイク用のオープンなテーブルフォーマットです。Apache Iceberg を使用すると、Hadoop 分散ファイルシステム (HDFS) または Object Storage Service (OSS) 上に、低コストでスケーラブルなデータレイクストレージサービスを構築できます。その後、Flink、Spark、Hive、Presto などのオープンソースのビッグデータエコシステムのコンピュートエンジンを使用して、データレイク内のデータを分析できます。
|
カテゴリ |
詳細 |
|
サポートタイプ |
ソーステーブル、結果テーブル、データインジェストの送信先 |
|
ランタイムモード |
バッチモードとストリームモード |
|
データフォーマット |
該当なし |
|
特定の監視メトリクス |
なし |
|
API タイプ |
SQL、データインジェスト用の YAML ジョブ |
|
結果テーブルでのデータ更新または削除のサポート |
はい |
特徴
Apache Iceberg は、以下の主要な特徴を提供します。
-
HDFS または OSS 上に構築された、軽量でコスト効率の高いデータレイクストレージサービス。
-
ACID 特性 (原子性、一貫性、独立性、永続性) のセマンティクスを完全にサポート。
-
履歴バージョンのロールバックをサポート。
-
効率的なデータフィルタリング。
-
スキーマ進化。
-
パーティション進化。
-
自己管理型 Hive Metastore との互換性。詳細については、「自己管理型 Hive Metastore (HMS) で Hive カタログを使用する」をご参照ください。
Flink のフォールトトレランスとストリーム処理機能を使用して、大量のログデータや行動データをリアルタイムで Apache Iceberg データレイクに取り込むことができます。その後、Flink または他の分析エンジンを使用して、そのデータから価値を抽出できます。
制限事項
-
Iceberg コネクタは、Flink コンピュートエンジン Ververica Runtime (VVR) 4.0.8 以降でのみサポートされています。Iceberg コネクタは、Data Lake Formation (DLF) カタログと併用する必要があります。詳細については、「DLF-Legacy カタログの管理」をご参照ください。
-
Iceberg コネクタは、Apache Iceberg v1 および v2 テーブルフォーマットをサポートしています。詳細については、「Iceberg Table Spec」をご参照ください。
説明v2 テーブルフォーマットは、リアルタイム計算エンジン VVR 8.0.7 以降でのみサポートされています。
-
ストリーム読み取りモードでは、ソーステーブルとして追加のみの Iceberg テーブルのみがサポートされます。
構文
CREATE TABLE iceberg_table (
id BIGINT,
data STRING
PRIMARY KEY(`id`) NOT ENFORCED
)
PARTITIONED BY (data)
WITH (
'connector' = 'iceberg',
...
);
WITH パラメーター
共通パラメーター (ソーステーブル用)
|
パラメーター |
説明 |
データ型 |
必須 |
デフォルト値 |
備考 |
|
connector |
ソーステーブルのタイプ。 |
String |
はい |
なし |
値は |
|
catalog-name |
カタログの名前。 |
String |
はい |
なし |
英語でカスタム名を入力します。 |
|
catalog-database |
データベースの名前。 |
String |
はい |
default |
DLF で作成したデータベースの名前 (例: dlf_db)。 説明
DLF データベースを作成していない場合は、作成してください。 |
|
io-impl |
分散ファイルシステムの実装クラス。 |
String |
はい |
なし |
値は |
|
oss.endpoint |
Alibaba Cloud Object Storage Service (OSS) のエンドポイント。 |
String |
いいえ |
なし |
詳細については、「リージョンとエンドポイント」をご参照ください。 説明
|
|
ご利用の Alibaba Cloud アカウントの AccessKey ID。 |
String |
はい |
なし |
詳細については、「AccessKey ID と AccessKey Secret を表示するにはどうすればよいですか?」をご参照ください。 重要
AccessKey 情報の漏洩を防ぐため、AccessKey の値には変数を使用することを推奨します。詳細については、「プロジェクト変数」をご参照ください。 |
|
ご利用の Alibaba Cloud アカウントの AccessKey Secret。 |
String |
はい |
なし |
|
|
catalog-impl |
カタログのクラス名。 |
String |
はい |
なし |
値は |
|
warehouse |
テーブルデータが格納されている OSS のパス。 |
String |
はい |
なし |
なし。 |
|
dlf.catalog-id |
ご利用の Alibaba Cloud アカウントの ID。 |
String |
はい |
なし |
アカウント ID は、ユーザー情報ページから取得できます。 |
|
dlf.endpoint |
DLF サービスのエンドポイント。 |
String |
はい |
なし |
。 説明
|
|
dlf.region-id |
DLF サービスのリージョン。 |
String |
はい |
なし |
。 説明
このリージョンは、dlf.endpoint に選択されたリージョンと同じである必要があります。 |
|
uri |
Hive メタストアの Thrift URI。 |
String |
Hive カタログを使用する場合にのみ必須です。 |
なし |
このパラメーターは、自己管理型 Hive Metastore と一緒に使用します。 |
結果テーブル固有のパラメーター
|
パラメーター |
説明 |
データ型 |
必須 |
デフォルト値 |
備考 |
|
write.operation |
書き込み操作モード。 |
String |
いいえ |
upsert |
|
|
hive_sync.enable |
Hive へのメタデータ同期を有効にするかどうかを指定します。 |
boolean |
いいえ |
false |
有効な値:
|
|
hive_sync.mode |
Hive データ同期モード。 |
String |
いいえ |
hms |
|
|
hive_sync.db |
メタデータを同期する Hive データベースの名前。 |
String |
いいえ |
現在のテーブルが存在するカタログ内のデータベースの名前。 |
なし。 |
|
hive_sync.table |
メタデータを同期する Hive テーブルの名前。 |
String |
いいえ |
現在のテーブルの名前。 |
なし。 |
|
dlf.catalog.region |
DLF サービスのリージョン。 |
String |
いいえ |
なし |
。 説明
|
|
dlf.catalog.endpoint |
DLF サービスのエンドポイント。 |
String |
いいえ |
なし |
。 説明
|
型マッピング
|
Iceberg フィールドタイプ |
Flink フィールドタイプ |
|
BOOLEAN |
BOOLEAN |
|
INT |
INT |
|
LONG |
BIGINT |
|
FLOAT |
FLOAT |
|
DOUBLE |
DOUBLE |
|
DECIMAL(P,S) |
DECIMAL(P,S) |
|
DATE |
DATE |
|
TIME |
TIME 説明
Iceberg のタイムスタンプの精度はマイクロ秒ですが、Flink のタイムスタンプの精度はミリ秒です。Flink を使用して Iceberg からデータを読み取る場合、時間の精度はミリ秒に合わせられます。 |
|
TIMESTAMP |
TIMESTAMP |
|
TIMESTAMPTZ |
TIMESTAMP_LTZ |
|
STRING |
STRING |
|
FIXED(L) |
BYTES |
|
BINARY |
VARBINARY |
|
STRUCT<...> |
ROW |
|
LIST<E> |
LIST |
|
MAP<K,V> |
MAP |
コード例
OSS バケットと DLF データベースが作成されていることを確認してください。詳細については、「コンソールでバケットを作成する」および「データベース、テーブル、関数」をご参照ください。
DLF データベースを作成し、[パス] を設定する際は、${warehouse}/${database_name}.db 形式を使用することを推奨します。たとえば、ウェアハウスアドレスが oss://iceberg-test/warehouse で、データベース名が dlf_db の場合、dlf_db の OSS パスを oss://iceberg-test/warehouse/dlf_db.db に設定します。
結果テーブルの例
この例では、Datagen コネクタを使用してストリーミングデータをランダムに生成し、それを Iceberg テーブルに書き込む方法を示します。
CREATE TEMPORARY TABLE datagen(
id BIGINT,
data STRING
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE dlf_iceberg (
id BIGINT,
data STRING
) WITH (
'connector' = 'iceberg',
'catalog-name' = '<yourCatalogName>',
'catalog-database' = '<yourDatabaseName>',
'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO',
'oss.endpoint' = '<yourOSSEndpoint>',
'access.key.id' = '${secret_values.ak_id}',
'access.key.secret' = '${secret_values.ak_secret}',
'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog',
'warehouse' = '<yourOSSWarehousePath>',
'dlf.catalog-id' = '<yourCatalogId>',
'dlf.endpoint' = '<yourDLFEndpoint>',
'dlf.region-id' = '<yourDLFRegionId>'
);
INSERT INTO dlf_iceberg SELECT * FROM datagen;
ソーステーブルの例
-
自己管理型 Hive Metastore (HMS) で Hive カタログを使用する。
Flink クラスターがネットワーク経由で HMS クラスターと通信できることを確認してください。データは
oss://<bucket>/<path>/<database-name>/flink_tableディレクトリに保存されます。CREATE TEMOPORY TABLE flink_table ( id BIGINT, data STRING ) WITH ( 'connector'='iceberg', 'catalog-name'='<yourCatalogName>', 'catalog-database'='<yourDatabaseName>', 'uri'='thrift://<ip>:<port>', 'warehouse'='oss://<bucket>/<path>', 'io-impl'='org.apache.iceberg.aliyun.oss.OSSFileIO', 'access-key-id'='<yourAccessKeyId>', 'access-key-secret'='<yourAccessKeySecret>', 'oss.endpoint'='<yourOSSEndpoint>' ); -
DLF カタログを使用して、Iceberg ソーステーブルから Iceberg 結果テーブルにデータを書き込む。
CREATE TEMPORARY TABLE src_iceberg ( id BIGINT, data STRING ) WITH ( 'connector' = 'iceberg', 'catalog-name' = '<yourCatalogName>', 'catalog-database' = '<yourDatabaseName>', 'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO', 'oss.endpoint' = '<yourOSSEndpoint>', 'access.key.id' = '${secret_values.ak_id}', 'access.key.secret' = '${secret_values.ak_secret}', 'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog', 'warehouse' = '<yourOSSWarehousePath>', 'dlf.catalog-id' = '<yourCatalogId>', 'dlf.endpoint' = '<yourDLFEndpoint>', 'dlf.region-id' = '<yourDLFRegionId>' ); CREATE TEMPORARY TABLE dst_iceberg ( id BIGINT, data STRING ) WITH ( 'connector' = 'iceberg', 'catalog-name' = '<yourCatalogName>', 'catalog-database' = '<yourDatabaseName>', 'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO', 'oss.endpoint' = '<yourOSSEndpoint>', 'access.key.id' = '${secret_values.ak_id}', 'access.key.secret' = '${secret_values.ak_secret}', 'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog', 'warehouse' = '<yourOSSWarehousePath>', 'dlf.catalog-id' = '<yourCatalogId>', 'dlf.endpoint' = '<yourDLFEndpoint>', 'dlf.region-id' = '<yourDLFRegionId>' ); BEGIN STATEMENT SET; INSERT INTO src_iceberg VALUES (1, 'AAA'), (2, 'BBB'), (3, 'CCC'), (4, 'DDD'), (5, 'EEE'); INSERT INTO dst_iceberg SELECT * FROM src_iceberg; END;
データインジェスト
Iceberg コネクタを sink として使用し、データインジェスト用の YAML ジョブでデータを書き込むことができます。
構文
sink:
type: iceberg
name: Iceberg Sink
catalog.properties.rest.signing-region: cn-beijing
catalog.properties.uri: http://cn-beijing-vpc.dlf.aliyuncs.com/iceberg
catalog.properties.warehouse: flink_iceberg
catalog.properties.type: rest
catalog.properties.io-impl: org.apache.iceberg.rest.DlfFileIO
設定項目
|
パラメーター |
説明 |
必須 |
データ型 |
デフォルト値 |
備考 |
|
type |
コネクタのタイプ。 |
はい |
STRING |
なし |
値は |
|
name |
sink の名前。 |
いいえ |
STRING |
なし |
sink の名前。 |
|
catalog.properties.rest.signing-region |
DLF のリージョン ID。詳細については、「エンドポイント」をご参照ください。 |
はい |
STRING |
なし |
なし |
|
catalog.properties.uri |
DLF REST カタログへのアクセスに使用される URI。詳細については、「Iceberg REST」をご参照ください。 |
はい |
STRING |
なし |
なし |
|
catalog.properties.warehouse |
DLF カタログの名前。 |
はい |
STRING |
なし |
なし |
|
catalog.properties.warehouse |
ファイルストレージのルートディレクトリ。 |
いいえ |
STRING |
なし |
なし |
|
catalog.properties.type |
カタログタイプ。値は rest である必要があります。 |
はい |
STRING |
rest |
なし |
|
catalog.properties.io-impl |
値は org.apache.iceberg.rest.DlfFileIO である必要があります。 |
はい |
STRING |
org.apache.iceberg.rest.DlfFileIO |
なし |
|
partition.key |
各パーティションテーブルのパーティションフィールド。 |
いいえ |
STRING |
なし |
各パーティションテーブルのパーティションキー。複数のテーブルに複数のプライマリキーを設定できます。テーブルを区切るにはセミコロン ( 暗黙的な変換が必要なパーティションの場合、パーティションフィールドに直接暗黙的な変換関数を追加できます。例: |
|
table.properties.* |
Iceberg テーブルを作成するためのパラメーター。 |
いいえ |
String |
なし |
詳細については、「Iceberg table options」をご参照ください。 |
使用例
次のコードは、Iceberg カタログが DLF カタログである場合に、Alibaba Cloud Data Lake Formation にデータを書き込む構成例を示しています。
source: type: mysql name: MySQL Source hostname: ${secret_values.mysql.hostname} port: ${mysql.port} username: ${secret_values.mysql.username} password: ${secret_values.mysql.password} tables: ${mysql.source.table} server-id: 8601-8604 sink: type: iceberg name: Iceberg Sink catalog.properties.rest.signing-region: cn-beijing catalog.properties.uri: http://cn-beijing-vpc.dlf.aliyuncs.com/iceberg catalog.properties.warehouse: flink_iceberg catalog.properties.type: rest catalog.properties.io-impl: org.apache.iceberg.rest.DlfFileIOcatalog.properties プレフィックスを持つパラメーターについては、「Iceberg DLF カタログの作成」をご参照ください。
スキーマ進化
現在、データインジェストの sink としての Iceberg は、以下のスキーマ進化イベントをサポートしています。
-
CREATE TABLE EVENT
-
ADD COLUMN EVENT
-
ALTER COLUMN TYPE EVENT (プライマリキー列の型変更はサポートされていません)
-
RENAME COLUMN EVENT
-
DROP COLUMN EVENT
-
TRUNCATE TABLE EVENT
-
DROP TABLE EVENT
下流の Iceberg テーブルが既に存在する場合、データは既存のテーブルスキーマに基づいて書き込まれます。システムはテーブルを再作成しようとはしません。
参考文献
Flink がサポートするコネクタについては、「サポートされているコネクタ」をご参照ください。