すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:リアルタイムログのデータインジェストによるクイックスタート

最終更新日:Mar 19, 2026

本トピックでは、Realtime Compute for Apache Flink を使用して、ApsaraMQ for Kafka から Hologres へログデータをリアルタイムで同期する方法について説明します。本チュートリアルを完了すると、ユーザー記録を継続的に Hologres データウェアハウスへストリーミングする実行中の Flink SQL ジョブが完成し、ソーススキーマの変更にも自動的に対応できるようになります。

アーキテクチャ概要

このパイプラインは、以下のデータフローに従います:

[Faker コネクタ] → [ApsaraMQ for Kafka] → [Realtime Compute for Apache Flink] → [Hologres]

Faker コネクタは合成ユーザー記録を生成し、それを ApsaraMQ for Kafka のトピックへ書き込みます。Realtime Compute for Apache Flink はそのトピックからデータを読み取り、Flink SQL を用いて変換を行い、結果を Hologres データウェアハウスへ書き込みます。これらの 3 つのサービスは、すべて同一の VPC 内に配置する必要があります。

本チュートリアルの手順

  1. IP アドレスホワイトリストの設定

  2. ApsaraMQ for Kafka インスタンス向けテストデータの準備

  3. Hologres カタログの作成(CTAS メソッドのみ)

  4. データ同期ジョブの開発および開始

  5. 完全データ同期の結果の確認

  6. テーブルスキーマ変更の自動同期機能の確認

前提条件

開始する前に、以下の条件が満たされていることを確認してください:

重要

ApsaraMQ for Kafka および Hologres のインスタンスは、Flink ワークスペースと同じ VPC 内に配置されている必要があります。これは厳格なネットワーク要件です。サービスが同じ VPC 内に配置されていない場合、接続エラーが発生します。同じ VPC 内にない場合は、先に進む前にそれらの間で接続を作成する必要があります。詳細については、「Realtime Compute for Apache Flink は VPC をまたいでサービスにアクセスできますか?」または「Realtime Compute for Apache Flink はインターネットにアクセスできますか?」をご参照ください。

ステップ 1:IP アドレスホワイトリストの設定

Flink ワークスペースから Kafka インスタンスおよび Hologres インスタンスへアクセスできるようにするため、Flink ワークスペースが配置されている vSwitch の CIDR ブロックを、両インスタンスのホワイトリストに追加します。

  1. Flink ワークスペースが配置されている vSwitch の CIDR ブロックを取得します。

    1. Realtime Compute for Apache Flink コンソール にログインします。

    2. 対象のワークスペースを見つけ、[アクション] 列で [その他] > [ワークスペースの詳細] を選択します。

    3. ワークスペースの詳細 ダイアログボックスで、vSwitch の CIDR ブロック をコピーします。

    vSwitch CIDR block in Workspace Details dialog
  2. Kafka インスタンスの IP アドレスのホワイトリストに CIDR ブロックを追加する

    CIDR ブロックを、[VPC] ネットワークのエンドポイントの許可リストに追加します。

    Kafka allowlist configuration
  3. Hologres インスタンスの IP アドレスホワイトリストに CIDR ブロックを追加する

    Hologres allowlist configuration

ステップ 2:ApsaraMQ for Kafka インスタンス向けテストデータの準備

Realtime Compute for Apache Flink の Faker コネクタ をデータ生成ツールとして使用し、データを ApsaraMQ for Kafka インスタンスに書き込みます。

説明

このステップおよびステップ 4 で使用するため、properties.bootstrap.servers エンドポイントの値が必要です。この値を取得するには、次の手順に従います。ApsaraMQ for Kafka コンソールに移動し、ターゲットインスタンス名をクリックします。[エンドポイント情報] セクションを [インスタンスの詳細] ページで見つけ、[VPC] ネットワークのエンドポイントを特定し、[ドメイン名] 列の値をコピーします。フォーマットは host:port,host:port,host:port です。

  1. ApsaraMQ for Kafka コンソールで、users という名前のトピックを作成します

  2. Kafka トピックへデータを書き込むジョブを開発します。

    1. Realtime Compute for Apache Flink の管理コンソール にログインします。

    2. 対象のワークスペースを見つけ、[コンソール][操作] 列でクリックします。

    3. 左側ナビゲーションウィンドウで、開発 > ETL を選択します。表示されたページで、新規作成 をクリックします。

    4. 新規ドラフト ダイアログボックスで、空のストリームドラフト を選択し、次へ をクリックします。ドラフトを以下のように設定します:

      設定項目

      説明

      名前

      kafka-data-input

      SQL ドラフトの名前。:ドラフト名は、現在の名前空間内で一意である必要があります。

      場所

      開発

      ドラフトのコードファイルが保存されるフォルダ。デフォルトでは、ドラフトのコードファイルは 開発 フォルダに保存されます。既存のフォルダの右側にあるアイコンをクリックすることで、サブフォルダを作成することもできます。

      エンジンバージョン

      vvr-8.0.11-flink-1.17

      ドラフトのエンジンバージョンをドロップダウンリストから選択します。

    5. 作成 をクリックします。

    6. 以下のコードスニペットを SQL エディターにコピー&ペーストし、properties.bootstrap.servers のプレースホルダー値を実際の Kafka エンドポイント(上記の注を参照)に置き換えます。

      CREATE TEMPORARY TABLE source ( id INT, first_name STRING, last_name STRING, `address` ROW<`country` STRING, `state` STRING, `city` STRING>, event_time TIMESTAMP ) WITH ( 'connector' = 'faker', 'number-of-rows' = '100', 'rows-per-second' = '10', 'fields.id.expression' = '#{number.numberBetween ''0'',''1000''}', 'fields.first_name.expression' = '#{name.firstName}', 'fields.last_name.expression' = '#{name.lastName}', 'fields.address.country.expression' = '#{Address.country}', 'fields.address.state.expression' = '#{Address.state}', 'fields.address.city.expression' = '#{Address.city}', 'fields.event_time.expression' = '#{date.past ''15'',''SECONDS''}' ); CREATE TEMPORARY TABLE sink ( id INT, first_name STRING, last_name STRING, `address` ROW<`country` STRING, `state` STRING, `city` STRING>, `timestamp` TIMESTAMP METADATA ) WITH ( 'connector' = 'kafka', 'properties.bootstrap.servers' = 'alikafka-host1.aliyuncs.com:9092,alikafka-host2.aliyuncs.com:9092,alikafka-host3.aliyuncs.com:9092', 'topic' = 'users', 'format' = 'json' ); INSERT INTO sink SELECT * FROM source;

      設定項目

      説明

      properties.bootstrap.servers

      alikafka-host1.aliyuncs.com:9092,alikafka-host2.aliyuncs.com:9092,alikafka-host3.aliyuncs.com:9092

      Kafka ブローカーのエンドポイント。形式: host:port,host:port,host:port。取得方法については、上記の注を参照してください。

      topic

      users

      Kafka トピックの名前。

  3. ジョブを開始します。

    1. SQL エディターの右上隅にある デプロイ をクリックします。

    2. ドラフトのデプロイ ダイアログボックスで、確認 をクリックします。

    3. ジョブのリソースを構成する

    4. O&M」>「デプロイメント」に移動し、対象のデプロイメントを見つけ、「開始」列の「操作」をクリックします。デプロイメントを開始する際に設定する必要があるパラメーターについて詳しくは、「デプロイメントの開始」をご参照ください。

    5. デプロイメント ページで、デプロイメントの状態を確認します。

      Deployment state showing RUNNING then FINISHED

      Faker コネクタは有限ストリームを提供するため、デプロイメントは RUNNING 状態に達してから約 1 分後に FINISHED 状態に遷移します。終了後、データは送信先の Kafka トピックへ書き込まれます。以下は、ApsaraMQ for Kafka へ書き込まれた JSON 形式のメッセージのサンプルです:

      { "id": 765, "first_name": "Barry", "last_name": "Pollich", "address": { "country": "United Arab Emirates", "state": "Nevada", "city": "Powlowskifurt" } }

ステップ 3:Hologres カタログの作成

説明

本ステップは、CTAS メソッドのみ に必要です。ステップ 4 で INSERT INTO メソッドを使用する予定の場合は、本ステップをスキップして、直接ステップ 4 に進んでください。

CTAS を使用した単一テーブル同期の場合、Realtime Compute for Apache Flink の開発コンソールで、送信先カタログとして Hologres カタログを作成します。このセクションでは、基本的な設定項目について説明します。詳細については、「Hologres カタログの作成」をご参照ください。

設定項目

説明

カタログ名

任意の名前を入力します。本例では holo を使用します。

エンドポイント

ご利用の Hologres インスタンスのエンドポイント。

ユーザー名

Alibaba Cloud アカウントの AccessKey ID。

パスワード

Alibaba Cloud アカウントの AccessKey Secret。

dbname

Hologres の既存のデータベース名を入力します。この例では、[flink_test_db] を使用します。重要: 進める前に、[flink_test_db] データベースが既に Hologres インスタンスに作成されていることを確認してください。そうでないと、エラーが発生します。詳細については、「データベースの作成」をご参照ください。

ステップ 4:データ同期ジョブの開発および開始

同期方法の選択

コードを記述する前に、ご使用のユースケースに合った方法を選択してください:

CTAS

INSERT INTO

テーブル作成

Flink が Hologres テーブルを自動的に作成

まず Hologres で手動でテーブルを作成

JSON 処理

ネストされたカラムが自動展開される(json.infer-schema.flatten-nested-columns.enable = true

ネストされた JSON をネイティブな JSONB カラムに直接マッピング可能

スキーマ進化

新しいネストカラムが出現した際に、Hologres スキーマが自動更新される

手動でのスキーマ更新が必要

ステップ 3 の必要性

はい — Hologres カタログを事前に作成する必要があります

いいえ — 接続は WITH 句で直接指定

推奨用途

迅速なセットアップ;スキーマ柔軟なパイプライン

カラム型に対する完全な制御;JSONB 最適化

同期ジョブの作成

  1. Realtime Compute for Apache Flink の管理コンソール にログインします。

  2. ターゲットのワークスペースを見つけ、[操作] 列の [コンソール] をクリックします。

  3. 左側ナビゲーションウィンドウで、開発 > ETL を選択します。表示されたページで、新規作成 をクリックします。

  4. 新規ドラフト ダイアログボックスで、空のストリームドラフト を選択し、次へ をクリックします。ドラフトを以下のように設定します:

    設定項目

    説明

    名前

    flink-quickstart-test

    SQL ドラフトの名前。:ドラフト名は、現在の名前空間内で一意である必要があります。

    場所

    開発

    ドラフトのコードファイルが保存されるフォルダ。デフォルトでは、ドラフトのコードファイルは 開発 フォルダに保存されます。既存のフォルダの右側にあるアイコンをクリックすることで、サブフォルダを作成することもできます。

    エンジンバージョン

    vvr-8.0.11-flink-1.17

    ドラフトのエンジンバージョンをドロップダウンリストから選択します。

  5. 作成 をクリックします。

  6. 以下のいずれかのコードスニペットを SQL エディターにコピー&ペーストし、プレースホルダー値を実際の値に置き換えます。

方法 1:CTAS

CREATE TABLE AS(CTAS)文により、Hologres 内に sync_kafka_users テーブルが自動的に作成され、JSON や JSONB としてカラム型を手動で定義する必要がなくなります。

CREATE TEMPORARY TABLE kafka_users ( `id` INT NOT NULL, `address` STRING, `offset` BIGINT NOT NULL METADATA, `partition` BIGINT NOT NULL METADATA, `timestamp` TIMESTAMP METADATA, `date` AS CAST(`timestamp` AS DATE), `country` AS JSON_VALUE(`address`, '$.country'), PRIMARY KEY (`partition`, `offset`) NOT ENFORCED ) WITH ( 'connector' = 'kafka', 'properties.bootstrap.servers' = 'alikafka-host1.aliyuncs.com:9092,alikafka-host2.aliyuncs.com:9092,alikafka-host3.aliyuncs.com:9092', 'topic' = 'users', 'format' = 'json', 'json.infer-schema.flatten-nested-columns.enable' = 'true', -- ネストされたカラムを自動展開します。 'scan.startup.mode' = 'earliest-offset' ); CREATE TABLE IF NOT EXISTS holo.flink_test_db.sync_kafka_users WITH ( 'connector' = 'hologres' ) AS TABLE kafka_users;
説明

partition および offset をプライマリキーとして宣言することで、ジョブのフェールオーバー後に重複データが発生するのを防ぎます。データが再送信された場合でも、Hologres には同じ partition および offset 値を持つデータの 1 つのコピーのみが保持されます。

プレースホルダー値を置き換えます:

設定項目

説明

properties.bootstrap.servers

alikafka-host1.aliyuncs.com:9092,alikafka-host2.aliyuncs.com:9092,alikafka-host3.aliyuncs.com:9092

Kafka ブローカーのエンドポイント。形式: host:port,host:port,host:port。取得方法については、ステップ 2 の注を参照してください。

topic

users

Kafka トピックの名前。

方法 2:INSERT INTO

ネストされた JSON データを Hologres のネイティブ JSONB カラムとして格納したい場合に、INSERT INTO 文を使用します。この方法では、ジョブを実行する前に、Hologres で sync_kafka_users テーブルを手動で作成する必要があります。

CREATE TEMPORARY TABLE kafka_users ( `id` INT NOT NULL, 'address' STRING, -- このカラムのデータはネストされた JSON データです。 `offset` BIGINT NOT NULL METADATA, `partition` BIGINT NOT NULL METADATA, `timestamp` TIMESTAMP METADATA, `date` AS CAST(`timestamp` AS DATE), `country` AS JSON_VALUE(`address`, '$.country') ) WITH ( 'connector' = 'kafka', 'properties.bootstrap.servers' = 'alikafka-host1.aliyuncs.com:9092,alikafka-host2.aliyuncs.com:9092,alikafka-host3.aliyuncs.com:9092', 'topic' = 'users', 'format' = 'json', 'json.infer-schema.flatten-nested-columns.enable' = 'true', -- ネストされたカラムを自動展開します。 'scan.startup.mode' = 'earliest-offset' ); CREATE TEMPORARY TABLE holo ( `id` INT NOT NULL, `address` STRING, `offset` BIGINT, `partition` BIGINT, `timestamp` TIMESTAMP, `date` DATE, `country` STRING ) WITH ( 'connector' = 'hologres', 'endpoint' = 'hgpostcn-****-cn-beijing-vpc.hologres.aliyuncs.com:80', 'username' = '************************', 'password' = '******************************', 'dbname' = 'flink_test_db', 'tablename' = 'sync_kafka_users' ); INSERT INTO holo SELECT * FROM kafka_users;

プレースホルダー値を置き換えます:

設定項目

説明

properties.bootstrap.servers

alikafka-host1.aliyuncs.com:9092,alikafka-host2.aliyuncs.com:9092,alikafka-host3.aliyuncs.com:9092

Kafka ブローカーのエンドポイント。形式: host:port,host:port,host:port。取得方法については、ステップ 2 の注を参照してください。

topic

users

Kafka トピックの名前。

endpoint

hgpostcn-****-cn-beijing-vpc.hologres.aliyuncs.com:80

Hologres インスタンスのエンドポイント。形式: <ip>:<port>。取得方法:Hologres コンソールに移動し、インスタンス名をクリックします。次に、ネットワーク情報 セクションで、VPC の選択 のエンドポイントを特定します。

username

************************

Alibaba Cloud アカウントの AccessKey ID です。重要: 認証情報を保護するため、AccessKey ペアをプレーンテキストでハードコーディングすることは避け、代わりに変数を使用してください。詳細については、「変数の管理」をご参照ください。

password

******************************

Alibaba Cloud アカウントの AccessKey Secret。

dbname

flink_test_db

Hologres データベースの名前。

tablename

sync_kafka_users

Hologres テーブルの名前です。: INSERT INTO を使用する場合は、事前に送信先の Hologres データベースに sync_kafka_users テーブルを作成し、必要なフィールドを定義する必要があります。public スキーマを使用しない場合は、schema.tableName 形式で tablename を指定します。

ジョブのデプロイおよび開始

  1. ドラフトを保存します。

  2. デプロイ をクリックします。

  3. [運用と保守]」 > 「[デプロイメント]」に移動し、対象のデプロイメントを見つけ、「[操作]」列の「[開始]」をクリックします。デプロイメントの開始時に設定するパラメーターについて詳しくは、「デプロイメントの開始」をご参照ください。

  4. デプロイメント ページで、デプロイメントの状態および詳細を確認します。

    Deployment state on the Deployments page

ステップ 5:完全データ同期の結果の確認

同期ジョブが RUNNING 状態に達した後、データが Hologres へ書き込まれていることを確認します。

  1. Hologres コンソール にログインします。

  2. インスタンス ページで、対象インスタンスの名前をクリックします。

  3. ページの右上隅で、インスタンスへの接続 をクリックします。

  4. メタデータ管理 タブで、sync_kafka_users テーブルのスキーマおよびデータを確認します。

    sync_kafka_users table in Metadata Management
    • テーブルスキーマsync_kafka_users テーブルの名前をダブルクリックして、テーブルスキーマを表示します。

      sync_kafka_users table schema
      説明

      Kafka の partition および offset フィールドを Hologres テーブルのプライマリキーとして宣言します。デプロイメントのフェールオーバーによりデータが再送信された場合でも、同じ partition および offset 値を持つデータは 1 つのコピーのみが保存されます。

    • テーブルデータsync_kafka_users テーブルページの右上隅で、テーブルの照会 をクリックします。SQL エディターで以下のステートメントを実行し、実行 をクリックします:

      SELECT * FROM public.sync_kafka_users order by partition, "offset";

      100 行のデータが表示され、Faker コネクタの 'number-of-rows' = '100' 設定と一致しているはずです。

      sync_kafka_users table data

ステップ 6:テーブルスキーマ変更の自動同期機能の確認

本ステップでは、スキーマ進化を実証します。新しいネストカラムを含む Kafka メッセージを送信し、Hologres が対応するカラムを自動的に追加することを確認します。

  1. ApsaraMQ for Kafka コンソールで、新しいカラムを含むメッセージを送信します。

    1. ApsaraMQ for Kafka コンソール にログインします。

    2. インスタンス ページで、対象インスタンスの名前をクリックします。

    3. 左側ナビゲーションウィンドウで、トピック をクリックします。users という名前のトピックを特定します。

    4. [メッセージの送信][操作] 列でクリックします。

    5. メッセージの送信および消費の開始 パネルで、以下のパラメーターを設定します。

      Send Message panel configuration

      設定項目

      送信方法

      コンソール を選択します。

      メッセージキー

      flinktest を入力します。

      メッセージコンテンツ

      メッセージコンテンツ フィールドに、以下の JSON コンテンツをコピー&ペーストします。:本メッセージには、新しいネストカラム house-points が含まれています。

      指定パーティションへの送信

      はい を選択します。

      パーティション ID

      0 を入力します。

      { "id": 100001, "first_name": "Dennise", "last_name": "Schuppe", "address": { "country": "Isle of Man", "state": "Montana", "city": "East Coleburgh" }, "house-points": { "house": "Pukwudgie", "points": 76 } }
    6. OK をクリックします。

  2. Hologres コンソールで、sync_kafka_users テーブルのスキーマおよびデータの変更を確認します。

    1. Hologres コンソール にログインします。

    2. インスタンス ページで、対象インスタンスの名前をクリックします。

    3. ページの右上隅で、インスタンスへの接続 をクリックします。

    4. メタデータ管理 タブで、sync_kafka_users テーブルの名前をダブルクリックします。

    5. sync_kafka_users テーブルページの右上隅で、テーブルの照会 をクリックします。SQL エディターで以下のステートメントを実行し、実行 をクリックします:

      SELECT * FROM public.sync_kafka_users order by partition, "offset";
    6. テーブルデータを確認します。

      sync_kafka_users table data after schema change

      結果より、id が 100001 のデータレコードが Hologres へ書き込まれていることがわかります。また、house-points.house および house-points.points のカラムも Hologres へ自動的に追加されています。

      説明

      Kafka の users テーブルの WITH 句で json.infer-schema.flatten-nested-columns.enabletrue に設定されているため、Realtime Compute for Apache Flink は、新たに出現したネストカラムを自動的に展開します。各カラムへのアクセスパスが、そのカラム名となります。

まとめおよび次のステップ

Realtime Compute for Apache Flink を使用して、ApsaraMQ for Kafka から Hologres へユーザー記録をリアルタイムでストリーミングするログインジェストパイプラインを、自動スキーマ進化機能付きで正常に構築しました。

本チュートリアルでは、以下の作業を行いました:

  • ネットワークホワイトリストを構成し、Flink が Kafka および Hologres へアクセスできるようにしました。

  • Faker をベースとしたデータ生成器をデプロイし、100 件の合成レコードを Kafka トピックへ書き込みました。

  • Kafka から Hologres へデータを継続的に同期する Flink SQL ジョブを作成しました。

  • スキーマ変更(新しいネストカラム)が Hologres へ自動的に伝播されることを検証しました。

さらに学習を進めるには、以下のトピックをご参照ください:

  • CREATE TABLE AS (CTAS) 文 — 高度な CTAS オプションおよび動作について学びます。

  • Kafka コネクタ — 本番ワークロード向けのコネクタの全構成オプションを確認します。

  • デプロイメントを設定する — 本番稼働用にリソース割り当て、チェックポイント、およびその他のデプロイメント設定を調整します。

リファレンス