すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for Kafka:MySQL ソースコネクタの作成

最終更新日:Jan 11, 2025

このトピックでは、MySQL ソースコネクタを作成し、DataWorks を使用して ApsaraDB RDS for MySQL から ApsaraMQ for Kafka インスタンストピックにデータを同期する方法について説明します。

前提条件

以下の要件が満たされている必要があります。
  • コネクタ機能が ApsaraMQ for Kafka インスタンスで有効になっていること。詳細については、コネクタ機能の有効化 をご参照ください。
    重要 ApsaraMQ for Kafka インスタンスが、中国 (深セン)、中国 (成都)、中国 (北京)、中国 (張家口)、中国 (杭州)、中国 (上海)、またはシンガポールリージョンにデプロイされていることを確認してください。
  • ApsaraDB RDS for MySQL インスタンスが作成されていること。 ApsaraDB RDS for MySQL インスタンスの作成方法については、ApsaraDB RDS for MySQL インスタンスの作成 をご参照ください。
  • ApsaraDB RDS for MySQL インスタンスにデータベースとデータベースアカウントが作成されていること。データベースとデータベースアカウントの作成方法については、アカウントとデータベースの作成 をご参照ください。
  • データベースにテーブルが作成されていること。 ApsaraDB RDS for MySQL で使用される一般的な SQL ステートメントについては、MySQL でよく使用される SQL ステートメント をご参照ください。
  • Alibaba Cloud アカウントまたは Resource Access Management (RAM) ユーザーを使用しているかどうかに関係なく、DataWorks が Elastic Network Interface (ENI) にアクセスする権限を持っていること。アカウントに権限を付与するには、クラウド リソースアクセス承認 ページに移動します。
    重要 RAM ユーザーを使用する場合は、RAM ユーザーに次の権限が付与されていることを確認してください。
    • AliyunDataWorksFullAccess: Alibaba Cloud アカウント内のすべての DataWorks リソースを管理するための権限。
    • AliyunBSSOrderAccess: Alibaba Cloud サービスを購入するための権限。

    RAM ユーザーにポリシーをアタッチする方法については、ステップ 2: RAM ユーザーに権限を付与する をご参照ください。

  • データソースとデータデスティネーションの両方が自分のアカウントを使用して作成されていること。データソースは ApsaraDB RDS for MySQL インスタンスです。データデスティネーションは ApsaraMQ for Kafka インスタンスです。
  • ApsaraDB RDS for MySQL インスタンスが存在する Virtual Private Cloud (VPC) の CIDR ブロックと、ApsaraMQ for Kafka インスタンスが存在する VPC の CIDR ブロックが重複していないこと。CIDR ブロックが重複している場合、Message Queue for Apache Kafka に MySQL ソースコネクタを作成することはできません。

背景情報

ApsaraMQ for Kafka コンソール で MySQL ソースコネクタを作成して、ApsaraDB RDS for MySQL インスタンスのテーブルから ApsaraMQ for Kafka インスタンストピックにデータを同期できます。コネクタは、次の図に示すように、DataWorks を使用して作成および実行されます。mysql_connector

ApsaraMQ for Kafka コンソールで MySQL ソースコネクタが作成されると、DataWorks Basic Edition が自動的に無料でアクティブになります。DataWorks Basic Edition は、データ統合用の DataWorks ワークスペースと専用リソースグループを作成します。DataWorks ワークスペースは無料で、専用リソースグループは 有料サービス です。データ統合用の専用リソースグループの仕様は、4 vCPU と 8 GB メモリです。リソースグループは月額サブスクリプションをサポートしています。デフォルトでは、データ統合用の専用リソースグループは期限切れ時に自動的に更新されます。DataWorks の課金については、概要 をご参照ください。

さらに、DataWorks は、MySQL ソースコネクタの構成に基づいて、ApsaraMQ for Kafka にデスティネーショントピックを自動的に生成します。ソーステーブルとデスティネーショントピックは、1 対 1 のマッピングに基づいて関連付けられます。デフォルトでは、DataWorks はプライマリキーを持つテーブルごとに 6 つのパーティションを含むトピックと、プライマリキーを持たないテーブルごとに 1 つのパーティションを含むトピックを生成します。Message Queue for Apache Kafka インスタンスで使用可能なトピック数とパーティション数が十分であることを確認してください。そうでない場合、DataWorks がトピックを作成できないため、MySQL ソースコネクタの作成に失敗します。

各トピックの名前は、<指定されたプレフィックス>_<ソーステーブルの名前> 形式です。アンダースコア (_) はシステムによって自動的に追加されます。次の図は例を示しています。

table_topic_match

この例では、指定されたプレフィックスは mysql です。同期されるソーステーブルは、table_1、table_2、...、table_n です。DataWorks は、ソーステーブルから同期されたデータを受信するためのトピックを自動的に生成します。トピックの名前は、mysql_table_1、mysql_table_2、...、mysql_table_n です。

注意事項

  • リージョン
    • データソースとデスティネーションの Message Queue for Apache Kafka インスタンスは、同じリージョンにデプロイされていない場合があります。この場合、Alibaba Cloud アカウント内に Cloud Enterprise Network (CEN) インスタンスがあり、データソースと Message Queue for Apache Kafka インスタンスの VPC が CEN インスタンスにアタッチされていることを確認してください。さらに、データソースとデータデスティネーション間のエンドツーエンドの接続を確保するために、リージョン間の接続の帯域幅が構成されていることを確認してください。

      そうでない場合、CEN インスタンスが自動的に作成され、デスティネーションの Message Queue for Apache Kafka インスタンスと専用リソースグループが存在する Elastic Compute Service (ECS) インスタンスの VPC が CEN インスタンスにアタッチされ、エンドツーエンドの接続が確保されます。自動的に作成された CEN インスタンスの帯域幅は、帯域幅が手動で構成されていないため、非常に低くなります。これにより、MySQL ソースコネクタの作成時またはコネクタの実行時に接続エラーが発生する可能性があります。

    • データソースとデスティネーションの Message Queue for Apache Kafka インスタンスは、同じリージョンにある場合があります。この場合、MySQL ソースコネクタを作成すると、データソースまたは Message Queue for Apache Kafka インスタンスの VPC に ENI が自動的に作成されます。ENI は、専用リソースグループが存在する ECS インスタンスにも自動的にバインドされます。これにより、データソースとデータデスティネーション間のエンドツーエンドの接続が確保されます。
  • DataWorks の専用リソースグループ
    • DataWorks では、各専用リソースグループを使用して最大 3 つの MySQL ソースコネクタを実行できます。MySQL ソースコネクタを作成するときに、既存のリソースグループで 3 つ未満の MySQL ソースコネクタが実行されていることが DataWorks によって検出されると、DataWorks はこのリソースグループを使用して新しく作成された MySQL ソースコネクタを実行します。
    • DataWorks の各専用リソースグループは、最大 2 つの VPC の ENI に関連付けることができます。リソースグループに関連付ける ENI と、リソースグループに既に関連付けられている ENI の CIDR ブロックが重複している場合、またはその他の技術的な制限により、DataWorks はリソースグループを使用して MySQL ソースコネクタを作成できない場合があります。この場合、このリソースグループで 3 つ未満のコネクタが実行されている場合でも、DataWorks は MySQL ソースコネクタを作成できるようにリソースグループを作成します。

MySQL ソースコネクタを作成してデプロイする

  1. ApsaraMQ for Kafka コンソール にログインします。

  2. リソースの分布概要 ページの セクションで、管理する ApsaraMQ for Kafka インスタンスが存在するリージョンを選択します。

  3. 左側のナビゲーションペインで、Connector タスクリスト をクリックします。

  4. Connector タスクリスト ページで、インスタンスの選択 ドロップダウンリストからコネクタが属するインスタンスを選択し、Connector の作成 をクリックします。

  5. Connector の作成 ウィザードで、次の操作を実行します。
    1. 基本情報の設定 ステップで、[名前] フィールドにコネクタ名を入力し、次へ をクリックします。
      パラメーター説明
      名前コネクタの名前。次の命名規則に基づいてコネクタ名を指定します。
      • コネクタ名は 1 ~ 48 文字で、数字、小文字、ハイフン (-) を使用できます。名前はハイフン (-) で始めることはできません。
      • 各コネクタ名は、ApsaraMQ for Kafka インスタンス内で一意である必要があります。

      コネクタは、connect-コネクタ名 形式で名前が付けられたコンシューマーグループを使用する必要があります。グループ このコンシューマーグループを作成していない場合、Message Queue for Apache Kafka は自動的にコンシューマーグループを作成します。グループ

      kafka-source-mysql
      インスタンスMessage Queue for Apache Kafka インスタンスに関する情報。デフォルトでは、インスタンスの名前と ID が表示されます。demo alikafka_post-cn-st21p8vj****
    2. ソースサービスの設定 ステップで、ソースサービスとして [ApsaraDB RDS for MySQL] を選択し、次の表に示すパラメーターを構成して、Next をクリックします。
      パラメーター説明
      RDS インスタンスが所属するリージョンApsaraDB RDS for MySQL インスタンスのリージョン ID。ドロップダウンリストからリージョン ID を選択します。中国 (深セン)
      ApsaraDB RDS のインスタンス IDデータを同期する ApsaraDB RDS for MySQL インスタンスの ID。rm-wz91w3vk6owmz****
      データベース名データを同期する ApsaraDB RDS for MySQL データベースの名前。mysql-to-kafka
      データベースアカウントApsaraDB RDS for MySQL データベースへのアクセスに使用するアカウントのユーザー名。mysql_to_kafka
      データベースアカウントのパスワードApsaraDB RDS for MySQL データベースへのアクセスに使用するアカウントのパスワード。該当なし
      データベーステーブルデータを同期する 1 つ以上の ApsaraDB RDS for MySQL テーブルの名前。複数のテーブル名はカンマ (,) で区切ります。

      ソーステーブルとデスティネーショントピックは、1 対 1 のマッピングに基づいて関連付けられます。

      mysql_tbl
      データテーブルの自動追加システムがデータベース内の他のテーブルを自動的に追加できる正規表現。システムは、データベースにテーブルが作成され、正規表現と一致することを検出すると、テーブル内のデータを Message Queue for Apache Kafka に自動的に同期します。

      .* を使用して、データベース内のすべてのテーブルを指定できます。

      .*
      Topic プレフィックスApsaraMQ for Kafka でコネクタ用に自動的に作成されるトピックに名前を付けるために使用されるプレフィックス。各トピック名は、プレフィックスと ApsaraDB RDS for MySQL データベース内の対応するソーステーブルの名前で構成されます。プレフィックスがグローバルに一意であることを確認してください。mysql
      重要
      ApsaraDB RDS for MySQL データベースアカウントには、少なくとも次の権限が付与されていることを確認してください。
      • SELECT
      • REPLICATION SLAVE
      • REPLICATION CLIENT
      次のステートメントは、アカウントに権限を付与する方法の例を示しています。
      GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'アカウントのユーザー名' @'%'; // データベースアカウントに SELECT、REPLICATION SLAVE、REPLICATION CLIENT の権限を付与します。
    3. ターゲットサービスの設定 ステップで、データを同期するデスティネーションの ApsaraMQ for Kafka インスタンスが表示されます。情報を確認し、作成 をクリックします。
  6. Connector タスクリスト ページに移動し、作成したコネクタを見つけて、デプロイ操作 列の をクリックします。
    Connector タスクリスト ページで、コネクタの ステータス パラメーターの値が 実行中 として表示されている場合、コネクタは作成されています。
    説明 コネクタの作成に失敗した場合は、このトピックで説明されているすべての前提条件が満たされているかどうかを確認してください。

    MySQL ソースコネクタの構成を変更する必要がある場合は、操作 列の タスクの設定 をクリックします。リダイレクトされた DataWorks コンソールで、コネクタの構成を変更します。

結果の確認

  1. ApsaraDB RDS for MySQL データベースのソーステーブルにデータを挿入します。
    次のサンプルコードは、ソーステーブルにデータを挿入する方法の例を示しています。
    INSERT INTO mysql_tbl
        (mysql_title, mysql_author, submission_date)
        VALUES
        ("mysql2kafka", "tester", NOW())
    SQL ステートメントの詳細については、MySQL でよく使用される SQL ステートメント をご参照ください。
  2. ApsaraMQ for Kafka のメッセージクエリ機能を使用して、ApsaraDB RDS for MySQL データベースのソーステーブルのデータが ApsaraMQ for Kafka インスタンスの対応するトピックに同期されているかどうかを確認します。
    詳細については、メッセージのクエリ をご参照ください。
    次のサンプルコードは、ApsaraDB RDS for MySQL のソーステーブルから ApsaraMQ for Kafka のトピックに同期されたデータの例を示しています。メッセージの構造とフィールドの詳細については、付録: メッセージ形式 をご参照ください。
    {
        "schema":{
            "dataColumn":[
                {
                    "name":"mysql_id",
                    "type":"LONG"
                },
                {
                    "name":"mysql_title",
                    "type":"STRING"
                },
                {
                    "name":"mysql_author",
                    "type":"STRING"
                },
                {
                    "name":"submission_date",
                    "type":"DATE"
                }
            ],
            "primaryKey":[
                "mysql_id"
            ],
            "source":{
                "dbType":"MySQL",
                "dbName":"mysql_to_kafka",
                "tableName":"mysql_tbl"
            }
        },
        "payload":{
            "before":null,
            "after":{
                "dataColumn":{
                    "mysql_title":"mysql2kafka",
                    "mysql_author":"tester",
                    "submission_date":1614700800000
                }
            },
            "sequenceId":"1614748790461000000",
            "timestamp":{
                "eventTime":1614748870000,
                "systemTime":1614748870925,
                "checkpointTime":1614748870000
            },
            "op":"INSERT",
            "ddl":null
        },
        "version":"0.0.1"
    }