すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for Kafka:移行ツールを使用する

最終更新日:May 23, 2025

このトピックでは、オープンソースの移行ツールを使用して、セルフマネージド Apache Kafka クラスタから ApsaraMQ for Kafka インスタンスにデータを移行する方法について説明します。

移行プロセス

注意事項

  • Alibaba Cloud にデプロイされたセルフマネージド Apache Kafka クラスタのメタデータとメッセージデータを移行する場合、セルフマネージド Apache Kafka クラスタと同じリージョンに ApsaraMQ for Kafka インスタンスを購入し、同じ仮想プライベートクラウド (VPC) にデプロイしてから、VPC 内のメタデータとメッセージデータを移行することをお勧めします。

  • このトピックでは、セルフマネージド Apache Kafka クラスタのメタデータとメッセージデータは、インターネットおよび VPC 接続ApsaraMQ for Kafka インスタンスに移行されます。

手順 1:仕様の評価

ApsaraMQ for Kafka は、クラスタトラフィック、ディスク容量、ディスクタイプなど、セルフマネージド Apache Kafka クラスタの情報に基づいて、移行タスクに必要な ApsaraMQ for Kafka インスタンスの仕様を評価および推奨する仕様評価機能を提供します。詳細については、「仕様の評価」をご参照ください。

手順 2:インスタンスの購入

評価されたインスタンス仕様に基づいて ApsaraMQ for Kafka インスタンスを購入し、デプロイします。詳細については、「インターネットおよび VPC 接続インスタンスの購入とデプロイ」をご参照ください。

手順 3:トピックとコンシューマーグループの移行

移行の実装

  1. セルフマネージド Apache Kafka クラスタのサーバーにログオンします。

  2. JDK 8 または 11 をダウンロードしてインストールします。詳細については、「Java ダウンロード」を参照してください。

  3. 移行ツールをダウンロードします: kafka-migration-assessment.jar.

  4. 以下の方法を使用して、トピックとコンシューマーグループを移行します。

    トピックの移行

    1. 移行ツールが配置されているディレクトリで、次のコマンドを実行して、移行するトピックの事前チェックを実行します。

      java -jar kafka-migration-assessment.jar TopicMigrationFromZk  \
      --sourceZkConnect 192.168.XX.XX  \
      --destAk <yourdestAccessKeyId>  \
      --destSk <yourdestAccessKeySecret>  \
      --destRegionId <yourdestRegionId>  \
      --destInstanceId <yourdestInstanceId>

      パラメーター

      説明

      sourceZkConnect

      ソース ZooKeeper クラスタの IP アドレス。

      destAk

      移行先 ApsaraMQ for Kafka インスタンスが属する Alibaba Cloud アカウントの AccessKey ID。

      destSk

      移行先 ApsaraMQ for Kafka インスタンスが属する Alibaba Cloud アカウントの AccessKey Secret。

      destRegionId

      移行先 ApsaraMQ for Kafka インスタンスのリージョン ID。

      destInstanceId

      移行先 ApsaraMQ for Kafka インスタンスの ID。

      確認されるサンプルの戻り結果:

      13:40:08 INFO - Begin to migrate topics:[test]
      13:40:08 INFO - Total topic number:1
      13:40:08 INFO - Will create topic:test, isCompactTopic:false, partition number:1
    2. 次のコマンドを実行して、トピックを移行します。

      java -jar kafka-migration-assessment.jar TopicMigrationFromZk  \
      --sourceZkConnect 192.168.XX.XX  \
      --destAk <yourdestAccessKeyId>  \
      --destSk <yourdestAccessKeySecret>  \
      --destRegionId <yourdestRegionId>  \
      --destInstanceId <yourdestInstanceId>  \
      --commit

      パラメーター

      説明

      commit

      移行タスクをコミットします

      移行タスクのコミット後のサンプルの戻り結果:

      13:51:12 INFO - Begin to migrate topics:[test]
      13:51:12 INFO - Total topic number:1
      13:51:13 INFO - cmd=TopicMigrationFromZk, request=null, response={"code":200,"requestId":"7F76C7D7-AAB5-4E29-B49B-CD6F1E0F508B","success":true,"message":"operation success"}
      13:51:13 INFO - TopicCreate success, topic=test, partition number=1, isCompactTopic=false

    コンシューマーグループの移行

    1. kafka.properties という名前の構成ファイルを作成します。

      kafka.properties ファイルは、Kafka コンシューマーを初期化して、セルフマネージド Apache Kafka クラスタのコンシューマーオフセットを取得するために使用されます。ファイルには次の内容が含まれています。

      ## エンドポイント。
      bootstrap.servers=localhost:9092
      
      ## グループ ID。 使用が最初のメッセージから開始されるようにするには、グループにコンシューマーオフセット情報を含めることはできません。
      group.id=XXX
      
      ## セキュリティ設定が不要な場合は、以下のパラメータを設定する必要はありません。
      
      ## SASL ベースの認証。
      #sasl.mechanism=PLAIN
      
      ## アクセスプロトコル。
      #security.protocol=SASL_SSL
      
      ## Secure Sockets Layer (SSL) ルート証明書のパス。
      #ssl.truststore.location=/Users/***/Documents/code/aliware-kafka-demos/main/resources/kafka.client.truststore.jks
      
      ## SSL パスワード。
      #ssl.truststore.password=***
      
      ## SASL パス。
      #java.security.auth.login.config=/Users/***/kafka-java-demo/vpc-ssl/src/main/resources/kafka_client_jaas.conf
    2. 移行ツールが配置されているディレクトリで、次のコマンドを実行して、移行する コンシューマーグループ の事前チェックを実行します。

      java -jar kafka-migration-assessment.jar ConsumerGroupMigrationFromTopic  \
      --propertiesPath /usr/local/kafka_2.12-2.4.0/config/kafka.properties  \
      --destAk <yourAccessKeyId>  \
      --destSk <yourAccessKeySecret>  \
      --destRegionId <yourRegionId>  \
      --destInstanceId <yourInstanceId>

      パラメーター

      説明

      propertiesPath

      kafka.properties 構成ファイルのパス。

      destAk

      移行先 ApsaraMQ for Kafka インスタンスが属する Alibaba Cloud アカウントの AccessKey ID。

      destSk

      移行先 ApsaraMQ for Kafka インスタンスが属する Alibaba Cloud アカウントの AccessKey Secret。

      destRegionId

      移行先 ApsaraMQ for Kafka インスタンスのリージョン ID。

      destInstanceId

      移行先 ApsaraMQ for Kafka インスタンスの ID。

      確認されるサンプルの戻り結果:

      15:29:45 INFO - Will create consumer groups:[XXX, test-consumer-group]
    3. 次のコマンドを実行して、コンシューマーグループ を移行します。

      java -jar kafka-migration-assessment.jar ConsumerGroupMigrationFromTopic  \
      --propertiesPath /usr/local/kafka_2.12-2.4.0/config/kafka.properties  \
      --destAk <yourAccessKeyId>  \
      --destSk <yourAccessKeySecret>  \
      --destRegionId <yourRegionId>  \
      --destInstanceId <yourInstanceId>  \
      --commit

      パラメーター

      説明

      commit

      移行タスクをコミットします。

      移行タスクのコミット後のサンプルの戻り結果:

      15:35:51 INFO - cmd=ConsumerGroupMigrationFromTopic, request=null, response={"code":200,"requestId":"C9797848-FD4C-411F-966D-0D4AB5D12F55","success":true,"message":"operation success"}
      15:35:51 INFO - ConsumerCreate success, consumer group=XXX
      15:35:57 INFO - cmd=ConsumerGroupMigrationFromTopic, request=null, response={"code":200,"requestId":"3BCFDBF2-3CD9-4D48-92C3-385C8DBB9709","success":true,"message":"operation success"}
      15:35:57 INFO - ConsumerCreate success, consumer group=test-consumer-group

移行の進捗状況の表示

  1. ApsaraMQ for Kafka コンソール にログオンします。リソースの分布 セクションの 概要 ページで、管理する ApsaraMQ for Kafka インスタンスが存在するリージョンを選択します。

  2. 左側のナビゲーションウィンドウで、Migration をクリックします。表示されるページで、メタデータのインポート タブをクリックします。

  3. メタデータのインポート タブで、移行タスクの移行先 ApsaraMQ for Kafka インスタンスを見つけ、トピックとコンシューマーグループの移行の進捗状況を表示します。

結果の確認

  1. ApsaraMQ for Kafka コンソール にログオンします。 リソースの分布 セクションの 概要 ページで、管理する ApsaraMQ for Kafka インスタンスが存在するリージョンを選択します。

  2. 左側のナビゲーションウィンドウで、Instances をクリックします。

  3. [インスタンス] ページで、管理するインスタンスの名前をクリックします。

  4. インスタンスのトピックとグループを表示します。

    • 左側のナビゲーションウィンドウで、[トピック] をクリックします。表示されるページで、インスタンスに作成されたトピックを表示します。

    • 左側のナビゲーションウィンドウで、[グループ] をクリックします。表示されるページで、インスタンスに作成されたグループを表示します。

(オプション) 手順 4:データの移行

Kafka は、Kafka クラスタのデータをバックアップするためのミラーリング機能を提供します。 MirrorMaker を使用して、ソースクラスタのデータを宛先クラスタにコピーできます。次の図に示すように、MirrorMaker は組み込みのコンシューマーを使用してソースのセルフマネージド Apache Kafka クラスタからメッセージを消費し、次に組み込みのプロデューサーを使用してメッセージを宛先 ApsaraMQ for Kafka クラスタに送信します。詳細については、「クラスタ間のデータのミラーリングと地理的レプリケーション」を参照してください。

前提条件

注意事項

  • トピック名は一致している必要があります。

  • パーティションの数は異なっていても構いません。

  • 同じパーティション内のデータは、移行後も同じパーティションに残らない場合があります。

  • デフォルトでは、同じキーを持つメッセージは同じパーティションに配布されます。

  • ノードに障害が発生した場合、通常のメッセージは順序が狂う可能性がありますが、パーティション順序付けされたメッセージは順序を保持できます。

  • セルフマネージド Apache Kafka クラスタと ApsaraMQ for Kafka インスタンスの両方がパスワードベースの認証を使用し、2 つのパスワードが一致しない場合、移行はサポートされません。

移行の実装

インターネットまたは VPC からアクセスすることを選択できます。

インターネットからのアクセス

  1. SSL 証明書をダウンロードします: mix.4096.client.truststore.jks.

  2. kafka_client_jaas.conf ファイルを構成します。

    KafkaClient {
       org.apache.kafka.common.security.plain.PlainLoginModule required
       username="your username" // ユーザー名
       password="your password"; // パスワード
    };
  3. producer.properties ファイルを構成します。

    ## ApsaraMQ for kafka インスタンスの SSL エンドポイント。エンドポイントは ApsaraMQ for Kafka コンソールで取得できます。
    bootstrap.servers=XXX.XXX.XXX.XXX:9093
    
    ## データ圧縮方式。
    compression.type=none
    
    ## 手順 1 でダウンロードしたファイルを使用してトラストストアを構成します。
    ssl.truststore.location=kafka.client.truststore.jks
    ssl.truststore.password=KafkaOnsClient
    security.protocol=SASL_SSL
    sasl.mechanism=PLAIN
    
    ## 次のパラメーターは、ApsaraMQ for Kafka 2.x インスタンスに簡易認証およびセキュリティ層 (SASL) 認証を使用する場合にのみ必要です。
    ssl.endpoint.identification.algorithm=
  4. java.security.auth.login.config ファイルを構成します。

    export KAFKA_OPTS="-Djava.security.auth.login.config=kafka_client_jaas.conf"                              
  5. 次のコマンドを実行して、移行プロセスを開始します。

    sh bin/kafka-mirror-maker.sh --consumer.config config/consumer.properties --producer.config config/producer.properties --whitelist topicName

VPC からのアクセス

  1. consumer.properties ファイルを構成します。

    ## セルフマネージド Apache Kafka クラスタのエンドポイント。
    bootstrap.servers=XXX.XXX.XXX.XXX:9092
    
    ## パーティションにメッセージを配布するためのコンシューマーポリシー。
    partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
    
    ## コンシューマーグループの名前。
    group.id=test-consumer-group
  2. producer.properties ファイルを構成します。

    ## ApsaraMQ for Kafka インスタンスのデフォルトエンドポイント。エンドポイントは ApsaraMQ for Kafka コンソールで取得できます。
    bootstrap.servers=XXX.XXX.XXX.XXX:9092
    
    ## データ圧縮方式。
    compression.type=none                                
  3. 次のコマンドを実行して、移行プロセスを開始します。

    sh bin/kafka-mirror-maker.sh --consumer.config config/consumer.properties --producer.config config/producer.properties --whitelist topicName

結果の確認

次のいずれかの方法を使用して、MirrorMaker が想定どおりに実行されているかどうかを確認できます。

  • kafka-consumer-groups.sh コマンドを使用して、セルフマネージド Apache Kafka クラスタのコンシューマーの進捗状況を確認します。

    bin/kafka-consumer-groups.sh --new-consumer --describe --bootstrap-server セルフマネージド Apache Kafka クラスタのエンドポイント --group test-consumer-group

  • セルフマネージド Apache Kafka クラスタにメッセージを送信します。 ApsaraMQ for Kafka コンソールで、トピックのパーティションステータスを確認し、現在のブローカーのメッセージの総数が正しいかどうかを確認します。 ApsaraMQ for Kafka コンソールでメッセージの内容を表示できます。詳細については、「メッセージのクエリ」をご参照ください。

次の手順

  1. ApsaraMQ for Kafka インスタンスの新しいコンシューマーグループを有効にして、インスタンス内のメッセージを消費します。

  2. ApsaraMQ for Kafka インスタンスの新しいプロデューサーを有効にし、元々のプロデューサーをシャットダウンし、元々のコンシューマーグループがセルフマネージド Apache Kafka クラスタのメッセージを引き続き消費できるようにします。

  3. セルフマネージド Apache Kafka クラスタ内のすべてのメッセージが元々のコンシューマーグループによって消費された後、それらとセルフマネージド Apache Kafka クラスタをシャットダウンします。