すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:Flink CDC を利用したデータインジェスト

最終更新日:Dec 10, 2025

Realtime Compute for Apache Flink を使用すると、Apache Flink の Change Data Capture (CDC) コネクタに基づいて、ソースからシンクへのデータインジェスト用の YAML スクリプトを作成できます。このトピックでは、YAML を使用してデータインジェストパイプラインを開発し、MySQL データベースから StarRocks にすべてのデータを同期する方法について説明します。

前提条件

背景情報

例えば、ApsaraDB RDS for MySQL インスタンスに `order_dw_mysql` という名前のデータベースがあるとします。`order_dw_mysql` データベースには、`orders`、`orders_pay`、`product_catalog` という 3 つのビジネス テーブルが作成されています。`order_dw_mysql` データベースのビジネス テーブルから StarRocks の `order_dw_sr` データベースにデータを同期するデータインジェストパイプラインを開発する場合は、次の手順を実行します。

  1. ステップ 1:テストデータの準備

  2. ステップ 2:データインジェストジョブの開発

  3. ステップ 3:ジョブの開始

  4. ステップ 4:StarRocks での同期結果の確認

ステップ 1:テストデータの準備

  1. ApsaraDB RDS for MySQL インスタンスでデータベースを作成し、そのデータベース用のアカウントを作成します。

    `order_dw_mysql` という名前のデータベースと、そのデータベースに対する読み取り/書き込み権限を持つ標準アカウントを作成します。詳細については、「アカウントとデータベースの作成」および「データベースの管理」をご参照ください。

  2. Data Management (DMS) コンソールから ApsaraDB RDS for MySQL インスタンスにログインします。

    詳細については、「DMS を使用して ApsaraDB RDS for MySQL インスタンスにログインする」をご参照ください。

  3. [SQL コンソール] タブで、次の文を入力し、[実行] をクリックして、データベースに 3 つのビジネス テーブルを作成し、データを挿入します。

    CREATE TABLE `orders` (
      order_id bigint not null primary key,
      user_id varchar(50) not null,
      shop_id bigint not null,
      product_id bigint not null,
      buy_fee numeric(20,2) not null,   
      create_time timestamp not null,
      update_time timestamp not null default now(),
      state int not null 
    );
    
    
    CREATE TABLE `orders_pay` (
      pay_id bigint not null primary key,
      order_id bigint not null,
      pay_platform int not null, 
      create_time timestamp not null
    );
    
    
    CREATE TABLE `product_catalog` (
      product_id bigint not null primary key,
      catalog_name varchar(50) not null
    );
    
    -- Prepare data.
    INSERT INTO product_catalog VALUES(1, 'phone_aaa'),(2, 'phone_bbb'),(3, 'phone_ccc'),(4, 'phone_ddd'),(5, 'phone_eee');
    
    INSERT INTO orders VALUES
    (100001, 'user_001', 12345, 1, 5000.05, '2023-02-15 16:40:56', '2023-02-15 18:42:56', 1),
    (100002, 'user_002', 12346, 2, 4000.04, '2023-02-15 15:40:56', '2023-02-15 18:42:56', 1),
    (100003, 'user_003', 12347, 3, 3000.03, '2023-02-15 14:40:56', '2023-02-15 18:42:56', 1),
    (100004, 'user_001', 12347, 4, 2000.02, '2023-02-15 13:40:56', '2023-02-15 18:42:56', 1),
    (100005, 'user_002', 12348, 5, 1000.01, '2023-02-15 12:40:56', '2023-02-15 18:42:56', 1),
    (100006, 'user_001', 12348, 1, 1000.01, '2023-02-15 11:40:56', '2023-02-15 18:42:56', 1),
    (100007, 'user_003', 12347, 4, 2000.02, '2023-02-15 10:40:56', '2023-02-15 18:42:56', 1);
    
    INSERT INTO orders_pay VALUES
    (2001, 100001, 1, '2023-02-15 17:40:56'),
    (2002, 100002, 1, '2023-02-15 17:40:56'),
    (2003, 100003, 0, '2023-02-15 17:40:56'),
    (2004, 100004, 0, '2023-02-15 17:40:56'),
    (2005, 100005, 0, '2023-02-15 18:40:56'),
    (2006, 100006, 0, '2023-02-15 18:40:56'),
    (2007, 100007, 0, '2023-02-15 18:40:56');

ステップ 2:データインジェストジョブの開発

  1. Realtime Compute for Apache Flink の管理コンソールにログインします。

  2. 管理したいワークスペースを見つけ、[操作] 列の [コンソール] をクリックします。Realtime Compute for Apache Flink の開発コンソールの左側のナビゲーションウィンドウで、[開発] > [データインジェスト] を選択します。

  3. [+] > [テンプレートから新規ドラフトを作成] をクリックします。ダイアログボックスで、[Data Ingestion From MySQL To Starrocks] をクリックし、[次へ] をクリックします。

  4. [名前][場所][エンジンバージョン] を設定し、[OK] をクリックします。

  5. ジョブコードを記述します。

    次のコードは、ApsaraDB RDS for MySQL の `order_dw_mysql` データベースから StarRocks の `order_dw_sr` データベースにすべてのテーブルを同期する方法の例を示しています。

    source:
      type: mysql
      hostname: rm-bp1rk934iidc3****.mysql.rds.aliyuncs.com
      port: 3306
      username: ${secret_values.mysqlusername}
      password: ${secret_values.mysqlpassword}
      tables: order_dw_mysql.\.*
      server-id: 5405-5415
    
    sink:
      type: starrocks
      name: StarRocks Sink
      jdbc-url: jdbc:mysql://fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:9030
      load-url: fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:8030
      username: ${secret_values.starrocksusername}
      password: ${secret_values.starrockspassword}
      table.create.properties.replication_num: 1
      sink.buffer-flush.interval-ms: 5000 # 5 秒ごとにデータをフラッシュします。
      
    route:
      - source-table: order_dw_mysql.\.*
        sink-table: order_dw_sr.<>
        replace-symbol: <>
        description: route all tables in source_db to sink_db
    
    pipeline:
      name: Sync MySQL Database to StarRocks

    次の表に、この例の ApsaraDB RDS for MySQL および StarRocks インスタンスに必要な構成情報を示します。データインジェストに使用されるその他のオプションの詳細については、「MySQL」および「StarRocks」をご参照ください。

    説明

    YAML スクリプトで開発されたデータインジェストジョブは、名前空間変数のみをサポートします。セキュリティリスクを防ぐため、認証情報をハードコーディングするのではなく、変数を使用することを推奨します。詳細については、「変数の管理」をご参照ください。

    カテゴリ

    オプション

    説明

    source

    hostname

    ApsaraDB RDS for MySQL データベースへのアクセスに使用する IP アドレスまたはホスト名。

    データベースの VPC エンドポイントを入力することを推奨します。

    rm-bp1rk934iidc3****.mysql.rds.aliyuncs.com

    port

    ApsaraDB RDS for MySQL データベースへのアクセスに使用するポート番号。

    3306

    username

    ApsaraDB RDS for MySQL データベースへのアクセスに使用するユーザー名とパスワード。「ステップ 1:テストデータの準備」で作成したアカウントのユーザー名とパスワードにオプションを設定します。

    ${secret_values.mysqlusername}

    password

    ${secret_values.mysqlpassword}

    tables

    ApsaraDB RDS for MySQL テーブルの名前。正規表現を使用して、複数のテーブルからデータを読み取ることができます。

    この例では、`order_dw_mysql` データベース内のすべてのテーブルとデータが同期されます。

    order_dw_mysql.\.*

    server-id

    データベースクライアントに割り当てられる数値 ID。

    5405-5415

    sink

    jdbc-url

    データベースへの接続に使用される Java Database Connectivity (JDBC) URL。

    JDBC URL には、特定の IP アドレスとフロントエンド (FE) の JDBC ポートが含まれます。このオプションには、jdbc:mysql://ip:port 形式で値を指定します。

    EMR コンソールの EMR Serverless StarRocks インスタンスの [インスタンス詳細] タブに移動して、FE 詳細セクションでインスタンスの [内部エンドポイント][クエリポート] を確認できます。

    jdbc:mysql://fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:9030

    load-url

    FE ノードへの接続に使用される HTTP サービス URL。

    EMR コンソールの EMR Serverless StarRocks インスタンスの [インスタンス詳細] タブに移動して、FE 詳細セクションでインスタンスの [内部エンドポイント][HTTP ポート] を確認できます。

    fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:8030

    username

    StarRocks データベースへのアクセスに使用するユーザー名とパスワード。

    EMR Serverless StarRocks インスタンスを作成したときに使用したユーザー名とパスワードにオプションを設定します。

    説明

    この例では、プレーンテキストのパスワード情報によるセキュリティリスクを防ぐために変数が使用されています。詳細については、「変数の管理」をご参照ください。

    ${secret_values.starrocksusername}

    password

    ${secret_values.starrockspassword}

    sink.buffer-flush.interval-ms

    Flink がバッファーをフラッシュする頻度を指定します。この例ではデータセットが小さいため、結果生成を早めるために短い間隔が設定されています。

    5000

    route

    source-table

    ソーステーブルを指定します。

    正規表現を使用して複数のテーブルを照合します。例えば、order_dw_mysql.\.*order_dw_mysql データベース内のすべてのテーブルをルーティングします。

    order_dw_mysql.\.*

    sink-table

    データルーティングの宛先を指定します。

    replace-symbol のシンボルを各ソーステーブル名のプレースホルダーとして使用して、多対多のルーティングを実装できます。

    ルーティング構成ルールの詳細については、「ルートモジュール」をご参照ください。

    order_dw_sr.<>

    replace-symbol

    パターンマッチングが使用されるときに、ソーステーブルの名前で置き換えられる文字列。

    <>

  6. [デプロイ] をクリックします。

ステップ 3:ジョブの開始

  1. [データインジェスト] ページの右上の [デプロイ] をクリックします。ダイアログボックスで、パラメーターを設定し、[確認] をクリックします。

  2. Realtime Compute for Apache Flink の開発コンソールの左側のナビゲーションウィンドウで、[O&M] > [デプロイメント] を選択します。[デプロイメント] ページで、対象のデプロイメントを見つけ、[操作] 列の [開始] をクリックします。

  3. [ジョブの開始] パネルで、ジョブの起動設定を行い、[開始] をクリックします。

    この例では、[初期モード] が選択されています。パラメーターの詳細については、「ジョブデプロイメントの開始」をご参照ください。ジョブを開始した後、[デプロイメント] ページでジョブデプロイメントのステータスと情報を確認できます。

ステップ 4:StarRocks での同期結果の確認

YAML デプロイメントが [実行中] 状態になった後、StarRocks でデータ同期のステータスを確認できます。

  1. EMR StarRocks Manager を使用して EMR Serverless StarRocks インスタンスにアクセスします。詳細については、「EMR StarRocks Manager を使用して EMR Serverless StarRocks インスタンスにアクセスする」をご参照ください。

  2. 表示されたページの左側のナビゲーションウィンドウで、[SQL エディター] をクリックします。[データベース] タブで、image アイコンをクリックします。

    `default_catalog` に `order_dw_sr` という名前のデータベースが表示されます。

  3. [クエリ] タブで、[+ ファイル] をクリックします。[ファイルの作成] ダイアログボックスで、パラメーターを設定して [クエリスクリプト] を作成します。クエリスクリプトの編集ページで、次の SQL 文を入力し、[実行] をクリックします。

    SELECT * FROM default_catalog.order_dw_sr.orders order by order_id;
    SELECT * FROM default_catalog.order_dw_sr.orders_pay order by pay_id;
    SELECT * FROM default_catalog.order_dw_sr.product_catalog order by product_id;
  4. 文の下に表示される同期結果を確認します。

    ApsaraDB RDS for MySQL データベースのテーブルと同じ名前のテーブルと、そのテーブルのデータが StarRocks データベースにすでに存在しています。

    image

関連ドキュメント