すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:リアルタイムでデータウェアハウスにデータを取り込む

最終更新日:Nov 09, 2025

Realtime Compute for Apache Flink を使用すると、リアルタイムでデータウェアハウスにデータを取り込むことができます。Realtime Compute for Apache Flink は、完全および増分データ同期の切り替え、メタデータの自動検出、テーブルスキーマ変更の同期、データベース同期など、複数の機能を提供し、リアルタイムのデータウェアハウスへのデータインジェストを簡素化し、リアルタイムデータ同期をより効率的かつ便利にします。このトピックでは、Realtime Compute for Apache Flink の開発コンソールで、ApsaraDB RDS for MySQL インスタンスから Hologres インスタンスにデータを同期するドラフトを作成する方法について説明します。

背景情報

たとえば、ApsaraDB RDS for MySQL インスタンスには、tpc_ds、user_db1、user_db2、user_db3 という名前の 4 つのデータベースがあります。tpc_ds データベースには、テーブルスキーマが異なる 24 のビジネステーブルが含まれています。データベース user_db1、user_db2、user_db3 ではシャーディングが実行されます。3 つのデータベースにはそれぞれ、同じテーブルスキーマを持つ 3 つのテーブルが含まれており、3 つのデータベースには user01 から user09 までの合計 9 つのテーブルがあります。次の図は、Alibaba Cloud Database Management Service (DMS) コンソールで表示できる ApsaraDB RDS for MySQL インスタンスのデータベースとテーブルを示しています。数据库和表情况

ApsaraDB RDS for MySQL インスタンスのデータベースから Hologres にテーブルとデータを同期するドラフトを開発する場合は、次の手順を実行できます。user01 から user09 という名前のテーブルをマージし、これらのテーブルのデータを Hologres テーブルに同期できます。

このトピックでは、Realtime Compute for Apache Flink でサポートされている CREATE TABLE AS 文と CREATE DATABASE AS 文を使用して、データベース全体のデータを同期し、シャーディングされたデータベース内のテーブルをマージして同期し、完全データと増分データを同期し、テーブルスキーマの変更をリアルタイムで同期します。

前提条件

テストデータの準備

  1. tpc_ds.sqluser_db1.sqluser_db2.sqluser_db3.sql をクリックして、テストデータをローカルコンピューターにダウンロードします。

  2. DMS コンソールで、ApsaraDB RDS for MySQL インスタンスのテストデータを準備します。

    1. DMS コンソールから ApsaraDB RDS for MySQL インスタンスにログインします。

      詳細については、「DMS を使用して ApsaraDB RDS for MySQL インスタンスにログインする」をご参照ください。

    2. [SQLConsole] タブで、次のコマンドを入力し、[実行] をクリックします。

      次のコマンドは、tpc_ds、user_db1、user_db2、user_db3 データベースを作成するために使用されます。

      CREATE DATABASE tpc_ds;
      CREATE DATABASE user_db1;
      CREATE DATABASE user_db2;
      CREATE DATABASE user_db3;
    3. 上部のナビゲーションバーで [データインポート] をクリックします。表示されたページで、[大容量データインポート] タブをクリックします。

    4. [大容量データインポート] タブで、[データベース] フィールドでデータをインポートするデータベースを選択し、[ファイル] をクリックして選択したデータベースの SQL ファイルをアップロードし、[送信] をクリックします。ファイルが事前チェックに合格したら、チケットを送信します。チケットが承認されたら、[変更を実行] をクリックします。表示されるダイアログボックスで、[実行の確認] をクリックします。

      この手順を繰り返して、データファイルを tpc_ds、user_db1、user_db2、user_db3 データベースに順番にインポートします。导入数据

  3. Hologres コンソールにログインし、テーブル user01 から user09 がマージされた後に取得されるデータを格納するために my_user データベースを作成します。

    データベースの作成方法の詳細については、「データベースの作成」をご参照ください。

IP アドレスホワイトリストの構成

Realtime Compute for Apache Flink が ApsaraDB RDS for MySQL および Hologres インスタンスにアクセスできるようにするには、Realtime Compute for Apache Flink ワークスペースが属する vSwitch の CIDR ブロックを ApsaraDB RDS for MySQL および Hologres のホワイトリストに追加する必要があります。

  1. Realtime Compute for Apache Flink ワークスペースが属する vSwitch の CIDR ブロックを取得します。

    1. Realtime Compute for Apache Flink の管理コンソールにログインします。

    2. フルマネージド Flink タブで、対象の [ワークスペース] を見つけ、[アクション] 列で [その他] > [ワークスペース詳細] を選択します。

    3. [ワークスペース詳細] ダイアログボックスで、[Realtime Compute For Apache Flink] ワークスペースが属する vSwitch の [CIDR ブロック] を表示します。

      网段信息

  2. Realtime Compute for Apache Flink インスタンスが属する vSwitch の CIDR ブロックを ApsaraDB RDS for MySQL インスタンスの IP アドレスホワイトリストに追加します。

    詳細については、「IP アドレスホワイトリストの構成」をご参照ください。RDS白名单

  3. Realtime Compute for Apache Flink ワークスペースが属する vSwitch の CIDR ブロックを Hologres インスタンスの IP アドレスホワイトリストに追加します。

    HoloWeb コンソールでインスタンスの IP アドレスホワイトリストを構成するには、インスタンスへの接続を設定するときに [ログオン方法] パラメーターを [パスワードなしのログオン] に設定する必要があります。詳細については、「IP アドレスホワイトリストの構成」をご参照ください。Holo白名单

ステップ 1: カタログの作成

データベース全体を同期したり、シャーディングされたデータベース内のテーブルをマージして同期したり、単一のテーブルを同期したりする場合は、宛先カタログを作成する必要があります。また、ソーステーブルのリストとソーステーブルに関する情報を取得するために、ソースカタログを作成する必要もあります。ソースカタログと宛先カタログは、Realtime Compute for Apache Flink の開発コンソールで作成できます。この例では、ソースカタログは ApsaraDB RDS for MySQL カタログで、宛先カタログは Hologres カタログです。

  1. mysql という名前の ApsaraDB RDS for MySQL カタログを作成します。

    詳細については、「MySQL カタログの管理」トピックの「Hologres カタログの管理」セクションをご参照ください。mysql catalog

  2. holo という名前の Hologres カタログを作成します。

    詳細については、「Hologres カタログの管理」トピックの「Hologres カタログの作成」セクションをご参照ください。Holo Catalog

  3. Realtime Compute for Apache Flink の開発コンソールにログインします。左側のナビゲーションウィンドウで、[カタログ] をクリックします。[カタログリスト] ページで、mysql および holo カタログが作成されているかどうかを確認します。

ステップ 2: データ同期ドラフトの開発

  1. Realtime Compute for Apache Flink の開発コンソールにログインし、ドラフトを作成します。

    1. 左側のナビゲーションウィンドウで、[開発] > [ETL] を選択します。SQL エディターページ左上の [新規] をクリックします。

    2. [新しいドラフト] ダイアログボックスの [SQL スクリプト] タブで、[空白のストリームドラフト] をクリックします。

      Realtime Compute for Apache Flink は、さまざまなコードテンプレートを提供し、データ同期をサポートしています。各コードテンプレートは、特定のシナリオ、コードサンプル、および説明を提供します。テンプレートをクリックして、Realtime Compute for Apache Flink の機能と関連する構文を学び、ビジネスロジックを実装できます。詳細については、「コードテンプレート」および「データ同期テンプレート」をご参照ください。

    3. [次へ] をクリックします。

    4. [新しいドラフト] ダイアログボックスで、ドラフトのパラメーターを構成します。次の表にパラメーターを示します。

      パラメーター

      説明

      名前

      作成するドラフトの名前。

      説明

      ドラフト名は現在のプロジェクト内で一意である必要があります。

      flink-test

      場所

      ドラフトのコードファイルが保存されるフォルダ。

      既存のフォルダの右側にある 新建文件夹 アイコンをクリックしてサブフォルダを作成することもできます。

      ドラフト

      エンジンバージョン

      デプロイメントで使用される Flink のエンジンバージョンを表示できます。エンジンバージョン、バージョンマッピング、および各バージョンのライフサイクルにおける重要な時点の詳細については、「エンジンバージョン」をご参照ください。

      vvr-6.0.4-flink-1.15

    5. [作成] をクリックします。

  2. ドラフトの次のコードをコードエディターにコピーします。

    次のサンプルコードは、ApsaraDB RDS for MySQL の tpc_ds データベース内のすべてのテーブルを Hologres の tpc_ds データベースに同期し、次にテーブル user01 から user09 をマージして Hologres の my_user.users テーブルに同期する方法を示しています。サンプルコード:

    USE CATALOG holo;
    
    BEGIN STATEMENT SET;
    
    -- ApsaraDB RDS for MySQL の tpc_ds データベース内のすべてのテーブルを Hologres の tpc_ds データベースに同期します。
    CREATE DATABASE IF NOT EXISTS tpc_ds
    AS DATABASE mysql.tpc_ds INCLUDING ALL TABLES
    /*+ OPTIONS('server-id'='8001-8004') */ ;
    
    -- テーブル user01 から user09 を Hologres の my_user.users テーブルに同期します。
    CREATE TABLE IF NOT EXISTS my_user.users
    AS TABLE mysql.`user_db[0-9]+`.`user[0-9]+`
    /*+ OPTIONS('server-id'='8001-8004') */;
    
    END;

    CREATE DATABASE AS 文は、tpc_ds データベース内のすべてのテーブルを Hologres に同期するために使用されます。CREATE TABLE AS 文は、テーブル user01 から user09 を Hologres の単一のテーブルに同期するために使用されます。STATEMENT SET 文は、1 つのデプロイメントで CREATE DATABASE AS 文と CREATE TABLE AS 文を組み合わせてコミットするために使用されます。Realtime Compute for Apache Flink はソースを自動的に最適化し、1 つのソースノードを再利用して複数の ApsaraDB RDS for MySQL テーブルからデータを読み取ります。これにより、ApsaraDB RDS for MySQL の接続数とデータ読み取り負荷が大幅に削減され、読み取りの安定性が向上します。

    説明

    データベース内の特定のテーブルを同期する場合は、CREATE DATABASE AS 文に INCLUDING TABLE または EXCLUDING TABLE を追加して、同期するテーブルを指定できます。たとえば、INCLUDING TABLE 'web.*' は、データベース内で名前が web で始まるテーブルのみを同期する必要があることを示します。

ステップ 3: デプロイメントの開始

  1. [ETL] ページの右上隅にある [デプロイ] をクリックします。表示されるダイアログボックスで、[確認] をクリックします。部署

    説明

    セッションクラスターは、開発環境やテスト環境などの非本番環境に適用できます。セッションクラスターにドラフトをデプロイまたはデバッグして、JobManager のリソース使用率を向上させ、デプロイメントの起動を高速化できます。セッションクラスターにドラフトをデプロイしないことをお勧めします。セッションクラスターにドラフトをデプロイすると、安定性の問題が発生する可能性があります。詳細については、「開発およびテスト環境 (セッションクラスター) の構成」をご参照ください。

  2. 左側のナビゲーションウィンドウで、[O&M] > [デプロイメント] を選択します。[デプロイメント] ページで、管理するデプロイメントを見つけ、[アクション] 列の [開始] をクリックします。パラメーターの構成方法の詳細については、「ジョブデプロイメントの開始」をご参照ください。

  3. [ジョブの開始] ダイアログボックスで、[開始] をクリックします。

    デプロイメントが開始された後、[デプロイメント] ページでデプロイメントのステータスと情報を表示できます。作业状态

ステップ 4: 完全データ同期結果の表示

  1. Hologres コンソールにログインします。

  2. 左側のナビゲーションウィンドウで、[HoloWeb に移動] をクリックします。HoloWeb の [メタデータ管理] ページで、Hologres インスタンスの tpc_ds データベースにある 24 のテーブルとテーブルデータを表示します。

    holo表数据

  3. [メタデータ管理] ページで、my_user データベースの users テーブルのスキーマを表示します。

    次の図は、完全データ同期後のテーブルスキーマとデータを示しています。

    • テーブルスキーマ表结构

      users テーブルのスキーマでは、ApsaraDB RDS for MySQL ソーステーブルのスキーマに基づいて _db_name 列と _table_name 列が追加されます。_db_name 列はデータソースのデータベース名を示し、_table_name 列はデータソースのテーブル名を示します。この 2 つの列は、シャーディングされたデータベース内のテーブルがマージされた後にデータが一意であることを保証するために、複合プライマリキーの一部として使用されます。

    • テーブルデータ

      users タブの右上隅にある [テーブルのクエリ] をクリックします。SQL エディターで、次のコマンドを入力し、[実行] をクリックします:

      select * from users order by _db_name,_table_name,id;

      次の図はテーブルデータを示しています。表数据

ステップ 5: 増分同期結果の表示

完全データ同期が完了すると、システムはデータ同期デプロイメントを増分データ同期フェーズに自動的に切り替えます。手動での介入は必要ありません。Realtime Compute for Apache Flink の開発コンソールの [アラーム] タブで、特定の時点での currentEmitEventTimeLag の値に基づいて、データ同期デプロイメントのデータ同期フェーズを判断できます。

  1. Realtime Compute for Apache Flink の管理コンソールにログインします。

  2. [フルマネージド Flink] タブで、管理するワークスペースを見つけ、[アクション] 列の [コンソール] をクリックします。

  3. 左側のナビゲーションウィンドウで、[O&M] > [デプロイメント] を選択します。[デプロイメント] ページで、管理するデプロイメントの名前をクリックします。

  4. [アラーム] タブをクリックします。

  5. currentEmitEventTimeLag のチャートを表示して、デプロイメントのデータ同期フェーズを判断します。

    数据曲线

    • ある時点での currentEmitEventTimeLag の値が 0 の場合、デプロイメントは完全データ同期フェーズで実行されます。

    • ある時点での currentEmitEventTimeLag の値が 0 より大きい場合、デプロイメントは増分同期フェーズに入ります。

  6. データ変更とスキーマ変更のリアルタイム同期を検証します。

    MySQL CDC データソースを使用すると、増分データ同期中にテーブルデータの変更とスキーマの変更をリアルタイムで同期できます。デプロイメントが増分データ同期フェーズに入った後、ApsaraDB RDS for MySQL インスタンスのテーブル user01 から user09 のテーブルスキーマとデータを変更して、データ変更とスキーマ変更のリアルタイム同期を検証できます。

    1. DMS コンソールを使用して ApsaraDB RDS for MySQL インスタンスにログインします。

      詳細については、「DMS を使用して ApsaraDB RDS for MySQL インスタンスにログインする」をご参照ください。

    2. user_db2 データベースで、次のコマンドを実行して user02 テーブルのスキーマを変更し、user02 テーブルにデータを挿入し、user05 テーブルのデータを更新します:

      USE DATABASE `user_db2`;
      ALTER TABLE `user02` ADD COLUMN `age` INT;   -- user02 テーブルに age 列を追加します。
      INSERT INTO `user02` (id, name, age) VALUES (27, 'Tony', 30); -- age 情報を含むデータを user02 テーブルに挿入します。
      UPDATE `user05` SET name='JARK' WHERE id=15;  -- name フィールドの特定の値を大文字に変更します。

    3. Hologres コンソールで、users テーブルのスキーマとデータの変更を表示します。

      users タブの右上隅にある [テーブルのクエリ] をクリックします。SQL エディターで、次のコマンドを入力し、[実行] をクリックします:

      select * from users order by _db_name,_table_name,id;

      次の図はテーブルデータを示しています。表结构和数据变化シャーディングされたデータベース内の複数のテーブルのスキーマは異なります。ただし、user02 テーブルのスキーマとデータの変更、および user05 テーブルのデータの変更は、宛先テーブルにリアルタイムで同期されます。Hologres の users テーブルでは、age 列が追加され、Tony の age データが挿入され、名前 JARK が大文字で表示されます。

(オプション) ステップ 6: デプロイメントのリソースの構成

最適なデプロイメントパフォーマンスを確保するために、処理する必要があるデータ量に基づいて、デプロイメントの並列度とさまざまなノードのリソース構成を調整することをお勧めします。デプロイメントの並列度と CU 数を簡単に調整するには、基本リソース構成モードを使用します。デプロイメントの並列度とノードのリソース構成をより詳細に調整するには、エキスパートリソース構成モードを使用します。

  1. 左側のナビゲーションウィンドウで、[O&M] > [デプロイメント] を選択します。[デプロイメント] ページで、管理する開発の名前をクリックします。

  2. [構成] タブの [リソース] セクションの右上隅にある [編集] をクリックします。

  3. [モード] パラメーターに [エクスポート] を選択します。次に、[今すぐプランを取得] をクリックします。

  4. ポインターを [その他] の上に移動し、[すべて展開] をクリックします。

    完全なトポロジーを表示して、デプロイメントのデータ同期プランを確認できます。プランには、データを同期する必要があるテーブルが表示されます。

  5. 各ノードの PARALLELISM を手動で構成します。

    holo.tpc_ds.store_sales ノードを除くすべてのシンクノードの PARALLELISM を 4 に設定します。tpc_ds データベースの store_sales テーブルには、最大量のデータが含まれています。Hologres へのデータ書き込みのパフォーマンスを向上させるには、holo.tpc_ds.store_sales ノードの PARALLELISM を 8 に設定できます。リソースパラメーターの構成方法の詳細については、「デプロイメントの構成」をご参照ください。

  6. [リソース] セクションの右上隅にある [保存] をクリックします。

  7. デプロイメントを再起動します。

    デプロイメントのリソースが構成された後、構成を有効にするにはデプロイメントを再起動する必要があります。

  8. デプロイメントの名前をクリックします。[概要] タブで、調整後の効果を表示します。

よくある質問

リファレンス