すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:Apache Iceberg

最終更新日:Mar 10, 2026

このトピックでは、Iceberg コネクタの使用方法について説明します。

背景情報

Apache Iceberg は、データレイク用のオープンなテーブルフォーマットです。Apache Iceberg を使用すると、Hadoop 分散ファイルシステム (HDFS) または Object Storage Service (OSS) 上に、低コストでスケーラブルなデータレイクストレージサービスを構築できます。その後、Flink、Spark、Hive、Presto などのオープンソースのビッグデータエコシステムのコンピュートエンジンを使用して、データレイク内のデータを分析できます。

カテゴリ

詳細

サポートタイプ

ソーステーブル、結果テーブル、データインジェストの送信先

ランタイムモード

バッチモードとストリームモード

データフォーマット

該当なし

特定の監視メトリクス

なし

API タイプ

SQL、データインジェスト用の YAML ジョブ

結果テーブルでのデータ更新または削除のサポート

はい

特徴

Apache Iceberg は、以下の主要な特徴を提供します。

  • HDFS または OSS 上に構築された、軽量でコスト効率の高いデータレイクストレージサービス。

  • ACID 特性 (原子性、一貫性、独立性、永続性) のセマンティクスを完全にサポート。

  • 履歴バージョンのロールバックをサポート。

  • 効率的なデータフィルタリング。

  • スキーマ進化。

  • パーティション進化。

  • 自己管理型 Hive Metastore との互換性。詳細については、「自己管理型 Hive Metastore (HMS) で Hive カタログを使用する」をご参照ください。

説明

Flink のフォールトトレランスとストリーム処理機能を使用して、大量のログデータや行動データをリアルタイムで Apache Iceberg データレイクに取り込むことができます。その後、Flink または他の分析エンジンを使用して、そのデータから価値を抽出できます。

制限事項

  • Iceberg コネクタは、Flink コンピュートエンジン Ververica Runtime (VVR) 4.0.8 以降でのみサポートされています。Iceberg コネクタは、Data Lake Formation (DLF) カタログと併用する必要があります。詳細については、「DLF-Legacy カタログの管理」をご参照ください。

  • Iceberg コネクタは、Apache Iceberg v1 および v2 テーブルフォーマットをサポートしています。詳細については、「Iceberg Table Spec」をご参照ください。

    説明

    v2 テーブルフォーマットは、リアルタイム計算エンジン VVR 8.0.7 以降でのみサポートされています。

  • ストリーム読み取りモードでは、ソーステーブルとして追加のみの Iceberg テーブルのみがサポートされます。

構文

CREATE TABLE iceberg_table (
  id    BIGINT,
  data  STRING
  PRIMARY KEY(`id`) NOT ENFORCED
)
 PARTITIONED BY (data)
 WITH (
 'connector' = 'iceberg',
  ...
);

WITH パラメーター

共通パラメーター (ソーステーブル用)

パラメーター

説明

データ型

必須

デフォルト値

備考

connector

ソーステーブルのタイプ。

String

はい

なし

値は iceberg である必要があります。

catalog-name

カタログの名前。

String

はい

なし

英語でカスタム名を入力します。

catalog-database

データベースの名前。

String

はい

default

DLF で作成したデータベースの名前 (例: dlf_db)。

説明

DLF データベースを作成していない場合は、作成してください。

io-impl

分散ファイルシステムの実装クラス。

String

はい

なし

値は org.apache.iceberg.aliyun.oss.OSSFileIO である必要があります。

oss.endpoint

Alibaba Cloud Object Storage Service (OSS) のエンドポイント。

String

いいえ

なし

詳細については、「リージョンとエンドポイント」をご参照ください。

説明
  • access.key.id: VVR 8.0.6 以前

  • access-key-id: VVR 8.0.7 以降

ご利用の Alibaba Cloud アカウントの AccessKey ID。

String

はい

なし

詳細については、「AccessKey ID と AccessKey Secret を表示するにはどうすればよいですか?」をご参照ください。

重要

AccessKey 情報の漏洩を防ぐため、AccessKey の値には変数を使用することを推奨します。詳細については、「プロジェクト変数」をご参照ください。

  • access.key.secret: VVR 8.0.6 以前

  • access-key-secret: VVR 8.0.7 以降

ご利用の Alibaba Cloud アカウントの AccessKey Secret。

String

はい

なし

catalog-impl

カタログのクラス名。

String

はい

なし

値は org.apache.iceberg.aliyun.dlf.DlfCatalog である必要があります。

warehouse

テーブルデータが格納されている OSS のパス。

String

はい

なし

なし。

dlf.catalog-id

ご利用の Alibaba Cloud アカウントの ID。

String

はい

なし

アカウント ID は、ユーザー情報ページから取得できます。

dlf.endpoint

DLF サービスのエンドポイント。

String

はい

なし

説明
  • dlf.endpoint パラメーターを DLF の VPC エンドポイントに設定することを推奨します。たとえば、中国 (杭州) リージョンを選択した場合、dlf.endpoint パラメーターを dlf-vpc.cn-hangzhou.aliyuncs.com に設定します。

  • VPC をまたいで DLF にアクセスするには、「ストレージスペースの管理と操作」をご参照ください。

dlf.region-id

DLF サービスのリージョン。

String

はい

なし

説明

このリージョンは、dlf.endpoint に選択されたリージョンと同じである必要があります。

uri

Hive メタストアの Thrift URI。

String

Hive カタログを使用する場合にのみ必須です。

なし

このパラメーターは、自己管理型 Hive Metastore と一緒に使用します。

結果テーブル固有のパラメーター

パラメーター

説明

データ型

必須

デフォルト値

備考

write.operation

書き込み操作モード。

String

いいえ

upsert

  • upsert (デフォルト): データを更新します。

  • insert: データを追加します。

  • bulk_insert: 更新なしで一括挿入を実行します。

hive_sync.enable

Hive へのメタデータ同期を有効にするかどうかを指定します。

boolean

いいえ

false

有効な値:

  • true: 機能を有効にします。

  • false (デフォルト): 機能を無効にします。

hive_sync.mode

Hive データ同期モード。

String

いいえ

hms

  • hms (デフォルト): Hive Metastore または DLF カタログを使用する場合は、このパラメーターを hms に設定します。

  • jdbc: JDBC カタログを使用する場合は、このパラメーターを jdbc に設定します。

hive_sync.db

メタデータを同期する Hive データベースの名前。

String

いいえ

現在のテーブルが存在するカタログ内のデータベースの名前。

なし。

hive_sync.table

メタデータを同期する Hive テーブルの名前。

String

いいえ

現在のテーブルの名前。

なし。

dlf.catalog.region

DLF サービスのリージョン。

String

いいえ

なし

説明
  • dlf.catalog.region パラメーターは、hive_sync.mode が hms に設定されている場合にのみ有効です。

  • このリージョンは、dlf.catalog.endpoint に選択されたリージョンと同じである必要があります。

dlf.catalog.endpoint

DLF サービスのエンドポイント。

String

いいえ

なし

説明
  • dlf.catalog.endpoint パラメーターは、hive_sync.mode が hms に設定されている場合にのみ有効です。

  • dlf.catalog.endpoint パラメーターを DLF の VPC エンドポイントに設定することを推奨します。たとえば、中国 (杭州) リージョンを選択した場合、dlf.catalog.endpoint パラメーターを dlf-vpc.cn-hangzhou.aliyuncs.com に設定します。

  • VPC をまたいで DLF にアクセスするには、「ストレージスペースの管理と操作」をご参照ください。

型マッピング

Iceberg フィールドタイプ

Flink フィールドタイプ

BOOLEAN

BOOLEAN

INT

INT

LONG

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

DECIMAL(P,S)

DECIMAL(P,S)

DATE

DATE

TIME

TIME

説明

Iceberg のタイムスタンプの精度はマイクロ秒ですが、Flink のタイムスタンプの精度はミリ秒です。Flink を使用して Iceberg からデータを読み取る場合、時間の精度はミリ秒に合わせられます。

TIMESTAMP

TIMESTAMP

TIMESTAMPTZ

TIMESTAMP_LTZ

STRING

STRING

FIXED(L)

BYTES

BINARY

VARBINARY

STRUCT<...>

ROW

LIST<E>

LIST

MAP<K,V>

MAP

コード例

OSS バケットと DLF データベースが作成されていることを確認してください。詳細については、「コンソールでバケットを作成する」および「データベース、テーブル、関数」をご参照ください。

説明

DLF データベースを作成し、[パス] を設定する際は、${warehouse}/${database_name}.db 形式を使用することを推奨します。たとえば、ウェアハウスアドレスが oss://iceberg-test/warehouse で、データベース名が dlf_db の場合、dlf_db の OSS パスを oss://iceberg-test/warehouse/dlf_db.db に設定します。

結果テーブルの例

この例では、Datagen コネクタを使用してストリーミングデータをランダムに生成し、それを Iceberg テーブルに書き込む方法を示します。

CREATE TEMPORARY TABLE datagen(
  id    BIGINT,
  data  STRING
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE dlf_iceberg (
  id    BIGINT,
  data  STRING
) WITH (
  'connector' = 'iceberg',
  'catalog-name' = '<yourCatalogName>',
  'catalog-database' = '<yourDatabaseName>',
  'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO',
  'oss.endpoint' = '<yourOSSEndpoint>',  
  'access.key.id' = '${secret_values.ak_id}',
  'access.key.secret' = '${secret_values.ak_secret}',
  'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog',
  'warehouse' = '<yourOSSWarehousePath>',
  'dlf.catalog-id' = '<yourCatalogId>',
  'dlf.endpoint' = '<yourDLFEndpoint>',  
  'dlf.region-id' = '<yourDLFRegionId>'
);

INSERT INTO dlf_iceberg SELECT * FROM datagen;

ソーステーブルの例

  • 自己管理型 Hive Metastore (HMS) で Hive カタログを使用する。

    Flink クラスターがネットワーク経由で HMS クラスターと通信できることを確認してください。データは oss://<bucket>/<path>/<database-name>/flink_table ディレクトリに保存されます。

    CREATE TEMOPORY TABLE flink_table (
      id   BIGINT,
      data STRING
    ) WITH (
      'connector'='iceberg',
      'catalog-name'='<yourCatalogName>',
      'catalog-database'='<yourDatabaseName>',
      'uri'='thrift://<ip>:<port>',
      'warehouse'='oss://<bucket>/<path>',
      'io-impl'='org.apache.iceberg.aliyun.oss.OSSFileIO',
      'access-key-id'='<yourAccessKeyId>',
      'access-key-secret'='<yourAccessKeySecret>',
      'oss.endpoint'='<yourOSSEndpoint>'
    );
  • DLF カタログを使用して、Iceberg ソーステーブルから Iceberg 結果テーブルにデータを書き込む。

    CREATE TEMPORARY TABLE src_iceberg (
      id    BIGINT,
      data  STRING
    ) WITH (
      'connector' = 'iceberg',
      'catalog-name' = '<yourCatalogName>',
      'catalog-database' = '<yourDatabaseName>',
      'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO',
      'oss.endpoint' = '<yourOSSEndpoint>',  
      'access.key.id' = '${secret_values.ak_id}',
      'access.key.secret' = '${secret_values.ak_secret}',
      'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog',
      'warehouse' = '<yourOSSWarehousePath>',
      'dlf.catalog-id' = '<yourCatalogId>',
      'dlf.endpoint' = '<yourDLFEndpoint>',  
      'dlf.region-id' = '<yourDLFRegionId>'
    );
    
    CREATE TEMPORARY TABLE dst_iceberg (
      id    BIGINT,
      data  STRING
    ) WITH (
      'connector' = 'iceberg',
      'catalog-name' = '<yourCatalogName>',
      'catalog-database' = '<yourDatabaseName>',
      'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO',
      'oss.endpoint' = '<yourOSSEndpoint>',  
      'access.key.id' = '${secret_values.ak_id}',
      'access.key.secret' = '${secret_values.ak_secret}',
      'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog',
      'warehouse' = '<yourOSSWarehousePath>',
      'dlf.catalog-id' = '<yourCatalogId>',
      'dlf.endpoint' = '<yourDLFEndpoint>',  
      'dlf.region-id' = '<yourDLFRegionId>'
    );
    
    BEGIN STATEMENT SET;
    
    INSERT INTO src_iceberg VALUES (1, 'AAA'), (2, 'BBB'), (3, 'CCC'), (4, 'DDD'), (5, 'EEE');
    INSERT INTO dst_iceberg SELECT * FROM src_iceberg;
    
    END;

データインジェスト

Iceberg コネクタを sink として使用し、データインジェスト用の YAML ジョブでデータを書き込むことができます。

構文

sink:
  type: iceberg
  name: Iceberg Sink
  catalog.properties.rest.signing-region: cn-beijing
  catalog.properties.uri: http://cn-beijing-vpc.dlf.aliyuncs.com/iceberg
  catalog.properties.warehouse: flink_iceberg
  catalog.properties.type: rest
  catalog.properties.io-impl: org.apache.iceberg.rest.DlfFileIO

設定項目

パラメーター

説明

必須

データ型

デフォルト値

備考

type

コネクタのタイプ。

はい

STRING

なし

値は iceberg である必要があります。

name

sink の名前。

いいえ

STRING

なし

sink の名前。

catalog.properties.rest.signing-region

DLF のリージョン ID。詳細については、「エンドポイント」をご参照ください。

はい

STRING

なし

なし

catalog.properties.uri

DLF REST カタログへのアクセスに使用される URI。詳細については、「Iceberg REST」をご参照ください。

はい

STRING

なし

なし

catalog.properties.warehouse

DLF カタログの名前。

はい

STRING

なし

なし

catalog.properties.warehouse

ファイルストレージのルートディレクトリ。

いいえ

STRING

なし

なし

catalog.properties.type

カタログタイプ。値は rest である必要があります。

はい

STRING

rest

なし

catalog.properties.io-impl

値は org.apache.iceberg.rest.DlfFileIO である必要があります。

はい

STRING

org.apache.iceberg.rest.DlfFileIO

なし

partition.key

各パーティションテーブルのパーティションフィールド。

いいえ

STRING

なし

各パーティションテーブルのパーティションキー。複数のテーブルに複数のプライマリキーを設定できます。テーブルを区切るにはセミコロン (;) を使用し、パーティションキーを区切るにはカンマ (,) を使用します。たとえば、testdb.table1:id1,id2;testdb.table2:name を使用して、testdb.table1 テーブルのパーティションフィールドを id1 に、testdb.table2 テーブルのパーティションフィールドを name に設定できます。

暗黙的な変換が必要なパーティションの場合、パーティションフィールドに直接暗黙的な変換関数を追加できます。例: testdb.table1:truncate[10](id);testdb.table2:hour(create_time);testdb.table3:day(create_time);testdb.table4:month(create_time);testdb.table5:year(create_time);testdb.table6:bucket[10](create_time)

table.properties.*

Iceberg テーブルを作成するためのパラメーター。

いいえ

String

なし

詳細については、「Iceberg table options」をご参照ください。

使用例

次のコードは、Iceberg カタログが DLF カタログである場合に、Alibaba Cloud Data Lake Formation にデータを書き込む構成例を示しています。

  • source:
      type: mysql
      name: MySQL Source
      hostname: ${secret_values.mysql.hostname}
      port: ${mysql.port}
      username: ${secret_values.mysql.username}
      password: ${secret_values.mysql.password}
      tables: ${mysql.source.table}
      server-id: 8601-8604
    
    sink:
      type: iceberg
      name: Iceberg Sink
      catalog.properties.rest.signing-region: cn-beijing
      catalog.properties.uri: http://cn-beijing-vpc.dlf.aliyuncs.com/iceberg
      catalog.properties.warehouse: flink_iceberg
      catalog.properties.type: rest
      catalog.properties.io-impl: org.apache.iceberg.rest.DlfFileIO

    catalog.properties プレフィックスを持つパラメーターについては、「Iceberg DLF カタログの作成」をご参照ください。

スキーマ進化

現在、データインジェストの sink としての Iceberg は、以下のスキーマ進化イベントをサポートしています。

  • CREATE TABLE EVENT

  • ADD COLUMN EVENT

  • ALTER COLUMN TYPE EVENT (プライマリキー列の型変更はサポートされていません)

  • RENAME COLUMN EVENT

  • DROP COLUMN EVENT

  • TRUNCATE TABLE EVENT

  • DROP TABLE EVENT

説明

下流の Iceberg テーブルが既に存在する場合、データは既存のテーブルスキーマに基づいて書き込まれます。システムはテーブルを再作成しようとはしません。

参考文献

Flink がサポートするコネクタについては、「サポートされているコネクタ」をご参照ください。