すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:Apache Iceberg コネクタ

最終更新日:Apr 22, 2025

このトピックでは、Apache Iceberg コネクタの使用方法について説明します。

背景情報

Apache Iceberg は、データレイク向けのオープンなテーブル形式です。 Apache Iceberg を使用すると、Hadoop 分散ファイルシステム ( HDFS ) または Alibaba Cloud Object Storage Service ( OSS ) 上に独自のデータレイクストレージサービスを迅速に構築できます。 その後、Apache Flink、Apache Spark、Apache Hive、Apache Presto などのオープンソースビッグデータエコシステムのコンピューティングエンジンを使用して、データレイク内のデータを分析できます。 次の表に、Apache Iceberg コネクタの概要を示します。

項目

説明

テーブルタイプ

ソーステーブルとシンクテーブル

実行モード

バッチモードとストリーミングモード

データ形式

該当なし

メトリック

該当なし

API タイプ

SQL API

シンクテーブルでのデータの更新または削除

サポートされています

機能

Apache Iceberg コネクタは、次のコア機能を提供します。

  • HDFS または OSS に基づいて、低コストの軽量データレイクストレージサービスを構築します。

  • 包括的な原子性、一貫性、分離性、耐久性 ( ACID ) セマンティクスを提供します。

  • 履歴バージョンのバックトラッキングをサポートします。

  • 効率的なデータフィルタリングをサポートします。

  • スキーマ進化をサポートします。

  • パーティション進化をサポートします。

  • セルフマネージド Hive メタストアへのデータストレージをサポートします。 詳細については、「Apache Iceberg ソーステーブルの作成と使用」をご参照ください。

説明

Flink の強力なフォールトトレランスとストリーム処理機能を使用して、ログ内の大量の行動データをリアルタイムで Apache Iceberg データレイクにインポートできます。 その後、Flink または別の分析エンジンを使用して、データの値を抽出できます。

制限事項

  • Ververica Runtime ( VVR ) 4.0.8 以降を使用する Apache Flink 用 Realtime Compute のみ、Apache Iceberg コネクタをサポートしています。 Apache Iceberg コネクタは、Data Lake Formation ( DLF ) カタログと一緒に使用する必要があります。 詳細については、「DLF カタログの管理」をご参照ください。

  • Apache Iceberg コネクタは、バージョン 1 とバージョン 2 の Apache Iceberg テーブル形式をサポートしています。 詳細については、「Iceberg Table Spec」をご参照ください。

    説明

    VVR 8.0.7 以降を使用する Apache Flink 用 Realtime Compute のみ、バージョン 2 の Apache Iceberg テーブル形式をサポートしています。

  • ストリーミング読み取りモードが有効になっている場合、Append Only モードでデータが書き込まれた Apache Iceberg テーブルのみをソーステーブルとして使用できます。

構文

CREATE TABLE iceberg_table (
  id    BIGINT,
  data  STRING
  PRIMARY KEY(`id`) NOT ENFORCED
)
 PARTITIONED BY (data)
 WITH (
 'connector' = 'iceberg',
  ...
);

WITH 句のコネクタオプション

共通オプションとソーステーブルオプション

オプション

説明

データ型

必須

デフォルト値

備考

connector

使用するコネクタ。

STRING

はい

デフォルト値なし

値を iceberg に設定します。

catalog-name

カタログの名前。

STRING

はい

デフォルト値なし

カタログの名前を入力します。

catalog-database

データベースの名前。

STRING

はい

default

Data Lake Formation ( DLF ) で作成されたデータベースの名前に設定します。 例: dlf_db。

説明

DLF データベースを作成していない場合は、作成してください。

io-impl

分散ファイルシステムの実装クラスの名前。

STRING

はい

デフォルト値なし

値を org.apache.iceberg.aliyun.oss.OSSFileIO に設定します。

oss.endpoint

OSS バケットのエンドポイント。

STRING

いいえ

デフォルト値なし

詳細については、「リージョンとエンドポイント」をご参照ください。

説明
  • access.key.id: VVR 8.0.6 以前。

  • access-key-id: VVR 8.0.7 以降。

Alibaba Cloud アカウントの AccessKey ID。

STRING

はい

デフォルト値なし

詳細については、「アカウントの AccessKey ペアを表示するにはどうすればよいですか。」をご参照ください。

重要

資格情報のセキュリティを向上させるには、プレーンテキストで AccessKey ペアをハードコーディングすることは避け、代わりに変数を使用してください。詳細については、「キーの管理」をご参照ください。

  • access.key.secret: VVR 8.0.6 以前。

  • access-key-secret: VVR 8.0.7 以降。

Alibaba Cloud アカウントの AccessKey シークレット。

STRING

はい

デフォルト値なし

catalog-impl

カタログのクラス名。

STRING

はい

デフォルト値なし

値を org.apache.iceberg.aliyun.dlf.DlfCatalog に設定します。

warehouse

テーブルデータが格納される OSS ディレクトリ。

STRING

はい

デフォルト値なし

該当なし。

dlf.catalog-id

Alibaba Cloud アカウントの ID。

STRING

はい

デフォルト値なし

セキュリティ設定 ページにアクセスして、アカウント ID を取得できます。

dlf.endpoint

DLF のエンドポイント。

STRING

はい

デフォルト値なし

説明
  • dlf.endpoint を DLF の VPC エンドポイントに設定することをお勧めします。 たとえば、中国 (杭州) リージョンを選択した場合は、dlf.endpoint オプションを dlf-vpc.cn-hangzhou.aliyuncs.com に設定します。

  • VPC を介して DLF にアクセスする場合は、「ワークスペースと名前空間の管理と操作に関する FAQ」に記載されている手順に従ってください。

dlf.region-id

DLF サービスが有効になっているリージョンの名前。

STRING

はい

デフォルト値なし

説明

選択したリージョンが dlf.endpoint に選択したエンドポイントと一致していることを確認してください。

uri

Hive メタストアの thrift URI。

STRING

いいえ

デフォルト値なし

セルフマネージド Hive メタストアで使用されます。

説明

このオプションは、Hive カタログを使用する場合にのみ必要です。

シンク専用オプション

オプション

説明

データ型

必須

デフォルト値

備考

write.operation

書き込み操作モード。

STRING

いいえ

upsert

有効な値:

  • upsert: データが更新されます。

  • insert: データは追加モードでテーブルに書き込まれます。

  • bulk_insert: 一度に特定量のデータが書き込まれ、既存のデータは更新されません。

hive_sync.enable

メタデータを Hive に同期するかどうかを指定します。

BOOLEAN

いいえ

false

有効な値:

  • true: メタデータの Hive への同期が有効になります。

  • false: メタデータの Hive への同期が無効になります。

hive_sync.mode

Hive データ同期モード。

STRING

いいえ

hms

  • hms: Hive メタストアまたは DLF カタログを使用する場合は、デフォルト値を保持します。

  • jdbc: Java Database Connectivity ( JDBC ) カタログを使用する場合は、この値を jdbc に設定します。

hive_sync.db

データを同期する Hive データベースの名前。

STRING

いいえ

カタログ内の現在のテーブルのデータベース名

該当なし。

hive_sync.table

データを同期する Hive テーブルの名前。

STRING

いいえ

現在のテーブルの名前

該当なし。

dlf.catalog.region

DLF サービスが有効になっているリージョンの名前。

STRING

いいえ

デフォルト値なし

説明
  • dlf.catalog.region オプションは、hive_sync.mode オプションが hms に設定されている場合にのみ有効になります。

  • このオプションの値が、dlf.catalog.endpoint オプションで指定されたエンドポイントと一致していることを確認してください。

dlf.catalog.endpoint

DLF のエンドポイント。

STRING

いいえ

デフォルト値なし

説明
  • dlf.catalog.endpoint オプションは、hive_sync.mode オプションが hms に設定されている場合にのみ有効になります。

  • dlf.catalog.endpoint オプションを DLF の VPC エンドポイントに設定することをお勧めします。 たとえば、中国 (杭州) リージョンを選択した場合は、dlf.catalog.endpoint オプションを dlf-vpc.cn-hangzhou.aliyuncs.com に設定します。

  • VPC を介して DLF にアクセスする場合は、「ワークスペースと名前空間の管理と操作に関する FAQ」に記載されている手順に従ってください。

データ型マッピング

Apache Iceberg のデータ型

Apache Flink 用 Realtime Compute のデータ型

BOOLEAN

BOOLEAN

INT

INT

LONG

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

DECIMAL(P,S)

DECIMAL(P,S)

DATE

DATE

TIME

TIME

説明

Apache Iceberg のタイムスタンプはマイクロ秒単位の精度であり、Apache Flink 用 Realtime Compute のタイムスタンプはミリ秒単位の精度です。 Apache Flink 用 Realtime Compute を使用して Apache Iceberg データを読み取ると、時間の精度はミリ秒に調整されます。

TIMESTAMP

TIMESTAMP

TIMESTAMPTZ

TIMESTAMP_LTZ

STRING

STRING

FIXED(L)

BYTES

BINARY

VARBINARY

STRUCT<...>

ROW

LIST<E>

LIST

MAP<K,V>

MAP

サンプルコード

OSS バケットと DLF データベースが作成されていることを確認します。 詳細については、「バケットの作成」および「データベーステーブルと関数」をご参照ください。

説明

DLF データベースの [ディレクトリ] を指定する場合は、${warehouse}/${database_name}.db 形式のディレクトリを入力することをお勧めします。 たとえば、warehouse オプションの値が oss://iceberg-test/warehouse で、database_name オプションの値が dlf_db の場合、dlf_db データベースの OSS ディレクトリを oss://iceberg-test/warehouse/dlf_db.db に設定します。

Apache Iceberg シンクテーブルの作成と使用

Datagen コネクタを使用してランダムなストリーミングデータを生成し、Apache Iceberg に書き込みます。

CREATE TEMPORARY TABLE datagen(
  id    BIGINT,
  data  STRING
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE dlf_iceberg (
  id    BIGINT,
  data  STRING
) WITH (
  'connector' = 'iceberg',
  'catalog-name' = '<yourCatalogName>',
  'catalog-database' = '<yourDatabaseName>',
  'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO',
  'oss.endpoint' = '<yourOSSEndpoint>',  
  'access.key.id' = '${secret_values.ak_id}',
  'access.key.secret' = '${secret_values.ak_secret}',
  'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog',
  'warehouse' = '<yourOSSWarehousePath>',
  'dlf.catalog-id' = '<yourCatalogId>',
  'dlf.endpoint' = '<yourDLFEndpoint>',  
  'dlf.region-id' = '<yourDLFRegionId>'
);

INSERT INTO dlf_iceberg SELECT * FROM datagen;

Apache Iceberg ソーステーブルの作成と使用

  • Hive カタログで管理され、セルフマネージド Hive メタストアに格納されている Iceberg テーブルにマッピングする Flink テーブルを作成します。

    説明
    • Flink ワークスペースとセルフマネージド Hive メタストアクラスター間の接続が作成されていることを確認してください。

    • データファイルは oss://<bucket>/<path>/<database-name>/flink_table ディレクトリに格納されます。

    CREATE TEMOPORY TABLE flink_table (
      id   BIGINT,
      data STRING
    ) WITH (
      'connector'='iceberg',
      'catalog-name'='<yourCatalogName>',
      'catalog-database'='<yourDatabaseName>',
      'uri'='thrift://<ip>:<post>',
      'warehouse'='oss://<bucket>/<path>',
      'io-impl'='org.apache.iceberg.aliyun.oss.OSSFileIO',
      'access-key-id'='<yourAccessKeyID>',
      'access-key-secret'='<yourAccessKeySecret>',
      'oss.endpoint'='<yourOSSEndpoint>'
    );
  • DLF カタログで管理されている Iceberg テーブルにマッピングする 2 つの Flink テーブルを作成し、1 つの Flink テーブルからデータを読み取ってもう 1 つのテーブルに書き込みます。

    CREATE TEMPORARY TABLE src_iceberg (
      id    BIGINT,
      data  STRING
    ) WITH (
      'connector' = 'iceberg',
      'catalog-name' = '<yourCatalogName>',
      'catalog-database' = '<yourDatabaseName>',
      'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO',
      'oss.endpoint' = '<yourOSSEndpoint>',  
      'access.key.id' = '${secret_values.ak_id}',
      'access.key.secret' = '${secret_values.ak_secret}',
      'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog',
      'warehouse' = '<yourOSSWarehousePath>',
      'dlf.catalog-id' = '<yourCatalogId>',
      'dlf.endpoint' = '<yourDLFEndpoint>',  
      'dlf.region-id' = '<yourDLFRegionId>'
    );
    
    CREATE TEMPORARY TABLE dst_iceberg (
      id    BIGINT,
      data  STRING
    ) WITH (
      'connector' = 'iceberg',
      'catalog-name' = '<yourCatalogName>',
      'catalog-database' = '<yourDatabaseName>',
      'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO',
      'oss.endpoint' = '<yourOSSEndpoint>',  
      'access.key.id' = '${secret_values.ak_id}',
      'access.key.secret' = '${secret_values.ak_secret}',
      'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog',
      'warehouse' = '<yourOSSWarehousePath>',
      'dlf.catalog-id' = '<yourCatalogId>',
      'dlf.endpoint' = '<yourDLFEndpoint>',  
      'dlf.region-id' = '<yourDLFRegionId>'
    );
    
    BEGIN STATEMENT SET;
    
    INSERT INTO src_iceberg VALUES (1, 'AAA'), (2, 'BBB'), (3, 'CCC'), (4, 'DDD'), (5, 'EEE');
    INSERT INTO dst_iceberg SELECT * FROM src_iceberg;
    
    END;

参照

Apache Flink 用 Realtime Compute でサポートされているコネクタの詳細については、「サポートされているコネクタ」をご参照ください。