すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:ログデータをデータウェアハウスにリアルタイムで取り込む

最終更新日:Jun 11, 2025

このトピックでは、Realtime Compute for Apache Flink を使用して、Kafka から Hologres にログデータをリアルタイムで同期する方法について説明します。

前提条件

ステップ 1:IP アドレスのホワイトリストを設定する

Flink ワークスペースが Kafka インスタンスと Hologres インスタンスにアクセスできるようにするには、Flink ワークスペースが存在する vSwitch の CIDR ブロックを Kafka インスタンスと Hologres インスタンスのホワイトリストに追加する必要があります。

  1. Flink ワークスペースが存在する vSwitch CIDR ブロックを取得します。

    1. Realtime Compute for Apache Flink コンソール にログインします。

    2. 対象の [ワークスペース] を見つけ、[アクション] 列で [詳細] > [ワークスペースの詳細] を選択します。

    3. [ワークスペースの詳細] ダイアログボックスで、vSwitch の [CIDR ブロック] をコピーします。

      网段信息

  2. CIDR ブロックを Kafka インスタンスの IP ホワイトリストに追加します

    [VPC] ネットワークを使用してエンドポイントのホワイトリストを設定します。Kafka白名单

  3. CIDR ブロックを Hologres インスタンスの IP ホワイトリストに追加します

    Holo白名单

ステップ 2:ApsaraMQ for Kafka インスタンスのテストデータを準備する

Realtime Compute for Apache Flink の Faker コネクタ をデータジェネレーターとして使用し、データを ApsaraMQ for Kafka インスタンスに書き込みます。 Realtime Compute for Apache Flink の開発コンソールで次の手順を実行して、Kafka インスタンスにデータを書き込むことができます。

  1. users という名前のトピックを ApsaraMQ for Kafka コンソールで作成します

  2. Kafka トピックにデータを書き込むジョブを開発します。

    1. Realtime Compute for Apache Flink の管理コンソール にログインします。

    2. 対象のワークスペースを見つけて、[アクション] 列の [コンソール] をクリックします。

    3. 左側のナビゲーションウィンドウで、[開発] > [ETL] を選択します。 表示されたページで、[新規] をクリックします。

    4. [新規ドラフト] ダイアログボックスで、[空のストリームドラフト] などのテンプレートを選択します。 [次へ] をクリックします。 次に、以下の表に示すようにドラフトを設定します。

      設定項目

      説明

      [名前]

      kafka-data-input

      SQL ドラフトの名前。

      説明

      ドラフト名は、現在の名前空間で一意である必要があります。

      [場所]

      開発

      ドラフトのコードファイルが保存されるフォルダ。 デフォルトでは、ドラフトのコードファイルは [開発] フォルダに保存されます。

      既存のフォルダの右側にある 新建文件夹 アイコンをクリックして、サブフォルダを作成することもできます。

      [エンジンバージョン]

      vvr-8.0.11-flink-1.17

      ドロップダウンリストからドラフトのエンジンバージョンを選択します。

    5. [作成] をクリックします。

    6. コードを記述します。

      次のコードスニペットを SQL エディターにコピーして貼り付け、必要な変更を加えます。

      CREATE TEMPORARY TABLE source (
        id INT,
        first_name STRING,
        last_name STRING,
        `address` ROW<`country` STRING, `state` STRING, `city` STRING>,
        event_time TIMESTAMP
      ) WITH (
        'connector' = 'faker',
        'number-of-rows' = '100',  -- 生成する行数
        'rows-per-second' = '10', -- 1 秒あたりの行数
        'fields.id.expression' = '#{number.numberBetween ''0'',''1000''}', -- id フィールドの式
        'fields.first_name.expression' = '#{name.firstName}', -- 名のフィールドの式
        'fields.last_name.expression' = '#{name.lastName}', -- 姓のフィールドの式
        'fields.address.country.expression' = '#{Address.country}', -- 国のフィールドの式
        'fields.address.state.expression' = '#{Address.state}', -- 州のフィールドの式
        'fields.address.city.expression' = '#{Address.city}', -- 市のフィールドの式
        'fields.event_time.expression' = '#{date.past ''15'',''SECONDS''}' -- イベント時間の式
      );
      
      CREATE TEMPORARY TABLE sink (
        id INT,
        first_name STRING,
        last_name STRING,
        `address` ROW<`country` STRING, `state` STRING, `city` STRING>,
        `timestamp` TIMESTAMP METADATA
      ) WITH (
        'connector' = 'kafka',
        'properties.bootstrap.servers' = 'alikafka-host1.aliyuncs.com:9092,alikafka-host2.aliyuncs.com:9092,alikafka-host3.aliyuncs.com:9092', -- Kafka ブローカーのエンドポイント
        'topic' = 'users', -- Kafka トピック名
        'format' = 'json' -- フォーマット
      );
      
      INSERT INTO sink SELECT * FROM source;

      上記のコマンドのプレースホルダー値を実際の値に置き換えます。

      設定項目

      説明

      properties.bootstrap.servers

      alikafka-host1.aliyuncs.com:9092,alikafka-host2.aliyuncs.com:9092,alikafka-host3.aliyuncs.com:9092

      Kafka ブローカーの IP アドレスまたはエンドポイント。

      形式:host:port,host:port,host:port。 複数の host:port ペアはコンマ (,) で区切ります。

      説明

      ApsaraMQ for Kafka インスタンスのエンドポイントを取得するには、次の手順に従います。

      1. ApsaraMQ for Kafka コンソールで、ターゲットインスタンス名をクリックします。

      2. 表示される [インスタンスの詳細] ページの [エンドポイント情報] セクションで、VPC ネットワークのエンドポイントを見つけます。

      3. [ドメイン名] 列の値をコピーします。

      topic

      users

      Kafka トピックの名前。

  3. ジョブを開始します。

    1. SQL エディターの右上隅にある [デプロイ] をクリックします。

    2. [ドラフトのデプロイ] ダイアログボックスで、[確認] をクリックします。

    3. ジョブリソースを設定します

    4. [O&M] > [デプロイメント] に移動し、対象のデプロイメントを見つけて、[アクション] 列の [開始] をクリックします。

      デプロイメントの開始時に設定する必要があるパラメーターについては、「デプロイメントを開始する」をご参照ください。

    5. [デプロイメント] ページで、デプロイメントの状態を表示します。image

      Faker コネクタは有限ストリームを提供します。 したがって、デプロイメントが [実行中] のまま約 1 分後、デプロイメントは [完了] になります。 デプロイメントが完了すると、データが宛先 Kafka トピックに書き込まれたことを示します。 ApsaraMQ for Kafka に書き込まれた JSON 形式のメッセージの例を以下に示します。

      {
        "id": 765,
        "first_name": "Barry",
        "last_name": "Pollich",
        "address": {
          "country": "United Arab Emirates",
          "state": "Nevada",
          "city": "Powlowskifurt"
        }
      }

ステップ 3:Hologres カタログを作成する

単一テーブルの同期を実行する場合は、宛先カタログに宛先テーブルを作成する必要があります。 Realtime Compute for Apache Flink の開発コンソールで宛先カタログを作成できます。 このトピックでは、Hologres カタログが宛先カタログとして使用されます。 このセクションでは、Hologres カタログを作成する際の必須設定項目について簡単に説明します。 詳細については、「Hologres カタログを作成する」をご参照ください。

設定項目

説明

[カタログ名]

カスタム名を入力します。 この例では、[holo] が使用されています。

[エンドポイント]

Hologres インスタンスのエンドポイント。

[ユーザー名]

Alibaba Cloud アカウントの AccessKey ID。

[パスワード]

Alibaba Cloud アカウントの AccessKey シークレット。

[dbname]

Hologres の既存のデータベースの名前を入力します。 この例では、[flink_test_db] を使用します。

重要

このフィールドに指定された [flink_test_db] データベースが Hologres インスタンスに既に作成されていることを確認してください。 そうでない場合、エラーが発生します。 詳細については、Hologres ドキュメントの「データベースを作成する」をご参照ください。

ステップ 4:データ同期ジョブを開発して開始する

  1. Realtime Compute for Apache Flink の開発コンソールにログインし、データを同期するジョブを開発します。

    1. Realtime Compute for Apache Flink の管理コンソール にログインします。

    2. 対象のワークスペースを見つけて、[アクション] 列の [コンソール] をクリックします。

    3. 左側のナビゲーションウィンドウで、[開発] > [ETL] を選択します。 表示されたページで、[新規] をクリックします。

    4. [新規ドラフト] ダイアログボックスで、[空のストリームドラフト] などのテンプレートを選択します。 [次へ] をクリックします。 次に、以下の表に示すようにドラフトを設定します。

      設定項目

      説明

      [名前]

      flink-quickstart-test

      SQL ドラフトの名前。

      説明

      ドラフト名は、現在の名前空間で一意である必要があります。

      [場所]

      開発

      ドラフトのコードファイルが保存されるフォルダ。 デフォルトでは、ドラフトのコードファイルは [開発] フォルダに保存されます。

      既存のフォルダの右側にある 新建文件夹 アイコンをクリックして、サブフォルダを作成することもできます。

      [エンジンバージョン]

      vvr-8.0.11-flink-1.17

      ドロップダウンリストからドラフトのエンジンバージョンを選択します。

    5. [作成] をクリックします。

  2. コードを記述します。 次のコードスニペットを SQL エディターにコピーして貼り付け、必要な変更を加えます。

    次のいずれかの方法を使用して、users Kafka トピックから Hologres の flink_test_db データベースの sync_kafka_users テーブルにデータを同期します。

    CTAS

    CREATE TABLE AS (CTAS) 文をデータ同期に使用すると、ターゲットテーブルが宛先データベースに存在しない場合に、ターゲットテーブルが自動的に作成されます。 次のコードスニペットでは、CTAS 文を使用して Kafka トピックから Hologres テーブルにデータを同期する方法を示します。

    CREATE TEMPORARY TABLE kafka_users (
      `id` INT NOT NULL,
      `address` STRING,
      `offset` BIGINT NOT NULL METADATA,
      `partition` BIGINT NOT NULL METADATA,
      `timestamp` TIMESTAMP METADATA,
      `date` AS CAST(`timestamp` AS DATE),
      `country` AS JSON_VALUE(`address`, '$.country'),
      PRIMARY KEY (`partition`, `offset`) NOT ENFORCED
    ) WITH (
      'connector' = 'kafka',
      'properties.bootstrap.servers' = 'alikafka-host1.aliyuncs.com:9092,alikafka-host2.aliyuncs.com:9092,alikafka-host3.aliyuncs.com:9092',
      'topic' = 'users',
      'format' = 'json',
      'json.infer-schema.flatten-nested-columns.enable' = 'true', -- ネストされた列を自動的に展開します。
      'scan.startup.mode' = 'earliest-offset'
    );
    
    CREATE TABLE IF NOT EXISTS holo.flink_test_db.sync_kafka_users
    WITH (
      'connector' = 'hologres'
    ) AS TABLE kafka_users;
    説明

    タスクのフェイルオーバー後に Hologres に重複データが書き込まれるのを防ぐために、関連するプライマリキーをテーブルに追加して、データを一意に識別できます。データが再送信された場合、Hologres は同じパーティションとオフセット値を持つデータのコピーが1つだけ保持されるようにします。

    上記のコマンドのプレースホルダーの値を実際の値に置き換えます。

    構成項目

    説明

    properties.bootstrap.servers

    alikafka-host1.aliyuncs.com:9092,alikafka-host2.aliyuncs.com:9092,alikafka-host3.aliyuncs.com:9092

    Kafka ブローカーの IP アドレスまたはエンドポイント。

    フォーマット:host:port,host:port,host:port。複数の host:port ペアはカンマ (,) で区切ります。

    説明

    ApsaraMQ for Kafka インスタンスのエンドポイントを取得するには、次の手順に従います。

    1. ApsaraMQ for Kafka [コンソール] で、ターゲットインスタンス名をクリックします。

    2. 表示される [インスタンスの詳細] ページの [エンドポイント情報] セクションで、VPC ネットワークのエンドポイントを見つけます。

    3. [ドメイン名] 列の値をコピーします。

    topic

    users

    Kafka Topic の名前。

    INSERT INTO

    Hologres の JSON データと JSONB データを最適化するために特別なメソッドが使用されます。そのため、INSERT INTO 文を使用して、ネストされた JSON データを Hologres に同期できます。

    INSERT INTO 文をデータ同期に使用する場合、宛先テーブルを事前に作成する必要があります。 次のコードスニペットでは、INSERT INTO 文を使用して Kafka トピックから Hologres テーブルにデータを同期する方法を示します。

    CREATE TEMPORARY TABLE kafka_users (
      `id` INT NOT NULL,
      'address' STRING, -- この列のデータはネストされた JSON データです。
      `offset` BIGINT NOT NULL METADATA,
      `partition` BIGINT NOT NULL METADATA,
      `timestamp` TIMESTAMP METADATA,
      `date` AS CAST(`timestamp` AS DATE),
      `country` AS JSON_VALUE(`address`, '$.country')
    ) WITH (
      'connector' = 'kafka',
      'properties.bootstrap.servers' = 'alikafka-host1.aliyuncs.com:9092,alikafka-host2.aliyuncs.com:9092,alikafka-host3.aliyuncs.com:9092',
      'topic' = 'users',
      'format' = 'json',
      'json.infer-schema.flatten-nested-columns.enable' = 'true', -- ネストされた列を自動的に展開します。
      'scan.startup.mode' = 'earliest-offset'
    );
    
    CREATE TEMPORARY TABLE holo (
      `id` INT NOT NULL,
      `address` STRING,
      `offset` BIGINT,
      `partition` BIGINT,
      `timestamp` TIMESTAMP,
      `date` DATE,
      `country` STRING
    ) WITH (
      'connector' = 'hologres',
      'endpoint' = 'hgpostcn-****-cn-beijing-vpc.hologres.aliyuncs.com:80',
      'username' = '************************',
      'password' = '******************************',
      'dbname' = 'flink_test_db',
      'tablename' = 'sync_kafka_users'
    );
    
    INSERT INTO holo
    SELECT * FROM kafka_users;

    上記のコマンドのプレースホルダー値を実際の値に置き換えます。

    構成項目

    説明

    properties.bootstrap.servers

    alikafka-host1.aliyuncs.com:9092、alikafka-host2.aliyuncs.com:9092、alikafka-host3.aliyuncs.com:9092

    Kafka ブローカーの IP アドレスまたはエンドポイント。

    フォーマット: host:port,host:port,host:port。複数の host:port ペアはカンマ (,) で区切ります。

    説明

    ApsaraMQ for Kafka インスタンスのエンドポイントを取得するには、次の手順に従います。

    1. ApsaraMQ for Kafka [コンソール] で、ターゲットインスタンス名をクリックします。

    2. 表示される [インスタンスの詳細] ページの [エンドポイント情報] セクションで、VPC ネットワークのエンドポイントを見つけます。

    3. [ドメイン名] 列の値をコピーします。

    トピック

    ユーザー

    Kafka トピックの名前。

    エンドポイント

    hgpostcn-****-cn-beijing-vpc.hologres.aliyuncs.com:80

    Hologres インスタンスのエンドポイントです。

    フォーマット: <ip>:<port>。

    説明

    エンドポイント値を取得するには、次の手順に従います。

    1. Hologres コンソールに移動します。

    2. Hologres インスタンスの名前をクリックします。

    3. インスタンス詳細ページの [ネットワーク情報] セクションで、[VPC を選択] に対応するエンドポイントを見つけて、エンドポイント値をコピーします。

    ユーザー名

    ************************

    Hologres データベースへのアクセスに使用するユーザー名とパスワード。 Alibaba Cloud アカウントの AccessKey ID とシークレットを入力します。

    重要

    資格情報のセキュリティを向上させるために、プレーンテキストで AccessKey ペアをハードコーディングすることは避けてください。代わりに変数を使用してください。詳細については、「変数を管理する」をご参照ください。

    パスワード

    ******************************

    データベース名

    flink_test_db

    アクセスする Hologres データベースの名前。

    テーブル名

    sync_kafka_users

    Hologres テーブルの名前。

    説明
    • INSERT INTO 文を使用してデータを同期する場合、同期先 Hologres インスタンスのデータベースに sync_kafka_users テーブルを作成し、必要なフィールドを事前に定義する必要があります。

    • パブリックスキーマを使用しない場合は、schema.tableName 形式で tablename を指定する必要があります。

  3. 下書きを保存します。

  4. [デプロイ] をクリックします。

  5. [O&M] > [デプロイメント] に移動し、対象のデプロイメントを見つけ、[開始][アクション] 列でクリックします。

    デプロイメントの起動時に設定するパラメーターについては、「デプロイメントを開始する」をご参照ください。

    デプロイメントが開始された後、[デプロイメント] ページで、デプロイメントの状態やその他の情報を表示できます。image

ステップ 5:完全データ同期の結果を確認する

  1. Hologres コンソール にログインします。

  2. [インスタンス] ページで、ターゲットインスタンスの名前をクリックします。

  3. ページの右上隅にある [インスタンスに接続] をクリックします。

  4. [メタデータ管理] タブで、users という名前の Kafka Topic からデータを受信する sync_kafka_users テーブルのスキーマとデータを表示します。

    sync_kafka_users表

    次の図は、データ同期後の sync_kafka_users テーブルのスキーマとデータを示しています。

    • テーブルスキーマ

      sync_kafka_users テーブルの名前をダブルクリックして、テーブルスキーマを表示します。

      表结构

      説明

      データ同期ジョブを開発する際は、Kafka のパーティションとオフセットフィールドを Hologres テーブルのプライマリキーとして宣言することをお勧めします。 これにより、デプロイメントのフェールオーバーが原因でデータが再送信された場合、同じパーティションとオフセット値を持つデータのコピーが 1 つだけ保存されます。

    • テーブルデータ

      sync_kafka_users テーブルのページの右上隅にある [テーブルのクエリ] をクリックします。 SQL エディターで、次の文をコピーして貼り付け、[実行] をクリックします。

      SELECT * FROM public.sync_kafka_users order by partition, "offset";

      次の図は、sync_kafka_users テーブルのデータを示しています。表数据

ステップ 6:テーブルスキーマの変更が自動的に同期されるかどうかを確認する

  1. ApsaraMQ for Kafka コンソールで、新しい列を含むメッセージを送信します。

    1. ApsaraMQ for Kafka コンソール にログインします。

    2. [インスタンス] ページで、ターゲットインスタンスの名前をクリックします。

    3. 表示されるページの左側のナビゲーションウィンドウで、[トピック] をクリックします。表示されるページで、users という名前のトピックを見つけます。

    4. [メッセージの送信][アクション] 列でクリックします。

    5. [メッセージの送受信を開始] パネルで、次のようにパラメーターを構成します。

      image

      構成項目

      送信方法

      [コンソール] を選択します。

      メッセージキー

      flinktest と入力します。

      メッセージコンテンツ

      次の JSON コンテンツをコピーして [メッセージコンテンツ] フィールドに貼り付けます。

      {
        "id": 100001,
        "first_name": "Dennise",
        "last_name": "Schuppe",
        "address": {
          "country": "Isle of Man",
          "state": "Montana",
          "city": "East Coleburgh"
        },
        "house-points": {
          "house": "Pukwudgie",
          "points": 76
        }
      }
      説明

      上記の JSON コードでは、house-points は新しいネストされた列です。

      指定されたパーティションに送信

      [はい] を選択します。

      パーティション ID

      0 と入力します。

    6. [OK] をクリックします。

  2. Hologres コンソールで、sync_kafka_users テーブルのスキーマとデータの変更を表示します。

    1. Hologres コンソール にログインします。

    2. [インスタンス] ページで、ターゲットインスタンスの名前をクリックします。

    3. ページの右上隅にある [インスタンスに接続] をクリックします。

    4. [メタデータ管理] タブで、sync_kafka_users テーブルの名前をダブルクリックします。

    5. sync_kafka_users テーブルのページの右上隅にある [テーブルのクエリ] をクリックします。SQL エディターで、次のステートメントを入力し、[実行] をクリックします。

      SELECT * FROM public.sync_kafka_users order by partition, "offset";
    6. テーブルのデータを表示します。

      次の図は、sync_kafka_users テーブルのデータを示しています。Hologres表结果

      この図は、ID 100001 のデータレコードが Hologres に書き込まれていることを示しています。さらに、house-points.house 列と house-points.points 列が Hologres に追加されています。

      説明

      ApsaraMQ for Kafka のテーブルに挿入されるデータには、ネストされた列 house-points のデータのみが含まれています。ただし、Kafka users テーブルを作成するための WITH 句で json.infer-schema.flatten-nested-columns.enable が指定されています。この場合、Realtime Compute for Apache Flink は新しいネストされた列を自動的に展開します。列が展開された後、列にアクセスするためのパスが列の名前として使用されます。

参考文献