ストリーミングデータの処理中に、Kafka と他のシステム間のデータ同期、または Kafka クラスター間のデータ移行が必要になることがよくあります。 このページでは、Kafka Connect を使用して、Kafka クラスター間でデータを移行する方法について説明します。

始める前に

  • Alibaba Cloud アカウントに登録している必要があります。 詳細については、「Alibaba Cloud アカウントの作成」をご参照ください。
  • E-MapReduce をアクティブにする必要があります。
  • Alibaba Cloud アカウントの権限が付与されている必要があります。 詳細については、「役割の権限付与」をご参照ください。

このタスクについて

Kafka Connect は、Kafka と他のシステム間でストリーミングデータを高速で送信するためのスケーラブルで信頼性の高いツールです。 たとえば、Kafka Connect を使用して、データベースから binlog データを取得し、データベースのデータを Kafka クラスターに移行できます。 この方法で、データベースのデータを移行し、データベースをダウンストリームストリーミングデータ処理システムに間接的に接続できます。 Kafka Connect は、Kafka Connect コネクターの作成と管理に役立つ Representational State Transfer (REST) アプリケーションプログラミングインターフェイス (API) も提供します。

Kafka Connect は、スタンドアロンモードまたは分散モードで実行できます。 スタンドアロンモードでは、すべてのワーカーが同じプロセスで実行されます。 スタンドアロンモードと比較して、分散モードはよりスケーラブルでフォールトトレラントです。 最も一般的に使用されるモードです。実稼働環境用に推奨します。

このページでは、Kafka Connect の REST API を呼び出して、Kafka Connect が分散モードで実行される Kafka クラスター間でデータを移行する方法について説明します。

ステップ 1:クラスターの作成

E-MapReduce でソース Kafka クラスターとターゲット Kafkaクラスターを作成します。 Kafka Connect がタスクノードにインストールされます。 そのため、ターゲット Kafka クラスターにタスクノードを作成する必要があります。 クラスターの作成後、デフォルトで Kafka Connect はタスクノードで開始されます。 ポート番号は 8083 です。

ソース Kafka クラスターとターゲット Kafka クラスターを同じセキュリティグループに追加することをお勧めします。 ソース Kafka クラスターとターゲットKafka クラスターが異なるセキュリティグループに属している場合、2 つのクラスターはデフォルトで相互にアクセスできません。 相互アクセスを許可するには、セキュリティグループで必要とされる設定を変更しなければなりません。

  1. Alibaba Cloud E-MapReduce コンソール にログインします。
  2. ソース Kafka クラスターとターゲット Kafka クラスターを作成します。 詳細は、「クラスターの作成」をご参照ください。
    ターゲットKafkaクラスターを作成する場合、 タスクインスタンス 、つまりタスクノードを設定する必要があります。
    Kafka クラスターの作成

ステップ2:移行するデータを保存するためのトピックの作成

ソース Kafka クラスターに connect というトピックを作成します。

  1. Secure Shell (SSH) を使用して、ソース Kafka クラスターのヘッダーノードにログインします。 この例では、ヘッダーノードは emr-header-1 です。
  2. connect というトピックを作成するために以下のコマンドをルートとして実行します。
    kafka-topics.sh --create --zookeeper emr-header-1:2181 --replication-factor 2 --partitions 10 --topic connect
    Topic の作成
    上記の操作を実行した後、後で使用するためにログインウィンドウを保持します。

ステップ3:Kafka Connect コネクターの作成

ターゲットKafka クラスターのタスクノードで、curl コマンドを実行し、JavaScript Object Notation (JSON) データを使用して Kafka Connect コネクターを作成します。

  1. SSH を使用して、ターゲット Kafka クラスターのタスクノードにログインします。 この例では、タスクノードは emr-worker-3 です。
  2. オプション:Kafka Connect の設定をカスタマイズします。

    ターゲット Kafka クラスターの Kafka サービスの [設定] ページを開きます。 connect-distributed.propertiesoffset.storage.topicconfig.storage.topicstatus.storage.topic パラメータをカスタマイズします。 詳細は、「パラメーターの設定」をご参照ください。

    Kafka Connect は、オフセット、設定、タスクステータスを offset.storage.topicconfig.storage.topicstatus.storage.topic パラメータで指定されるトピックにそれぞれ保存します。 Kafka Connect は、/etc/ecm/kafka-conf/connect-distributed.properties に保存されているデフォルトのパーティションとレプリケーション係数を使用して、これらのトピックを自動的に作成します。

  3. root ユーザーとして次のコマンドを実行して、Kafka Connect コネクターを作成します。
    curl -X POST -H "Content-Type: application/json" --data '{"name": "connect-test", "config": { "connector.class": "EMRReplicatorSourceConnector", "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "src.kafka.bootstrap.servers": "${src-kafka-ip}:9092", "src.zookeeper.connect": "${src-kafka-curator-ip}:2181", "dest.zookeeper.connect": "${dest-kafka-curator-ip}:2181", "topic.whitelist": "${source-topic}", "topic.rename.format": "${dest-topic}", "src.kafka.max.poll.records": "300" } }' http://emr-worker-3:8083/connectors

    JSON データでは、[名前] フィールドは、作成する Kafka Connect コネクター名を示します。この例では connect-test です。 config フィールドは、実際の要件に基づいて設定する必要があります。 次の表は、config フィールドの主要な変数を説明しています。

    変数 説明
    ${source-topic} 移行するデータをソース Kafka クラスターに保存するためのトピック。 たとえば、connect。 複数入力するときは、コンマ ( , ) で区切ります。
    ${dest-topic} ターゲット Kafka クラスターでデータが移行されるトピック。 たとえば、connect.replica
    ${src-kafka-curator-hostname} ソース Kafka クラスターで ZooKeeper サービスがインストールされているノードの内部 IP アドレス。
    ${dest-kafka-curator-hostname} ZooKeeper サービスがターゲット Kafka クラスターにインストールされているノードの内部 IP アドレス。
    上記の操作を実行した後、後で使用するためにログインウィンドウを保持します。

ステップ 4:Kafka Connect コネクターとタスクノードのステータスの表示

Kafka Connect コネクターとタスクノードのステータスを表示し、それらが正常なステータスであることを確認します。

  1. ターゲット Kafka クラスターのタスクノードのログインウィンドウに戻ります。 この例では、タスクノードは emr-worker-3 です。
  2. root ユーザーとして次のコマンドを実行して、すべての Kafka Connect コネクターを表示します。
    curl emr-worker-3:8083/connectors
    すべての Kafka Connect コネクタを表示
  3. root ユーザーとして次のコマンドを実行して、この例で作成された Kafka Connect コネクターのステータス、つまり connect-testを表示します。
    curl emr-worker-3:8083/connectors/connect-test/status
    接続テストのステータスを表示する

    Kafka Connect コネクター (この例では connect-test) は必ずRUNNING ステータスにしてください。

  4. rootユーザーとして次のコマンドを実行して、タスクノードの詳細を表示します。
    curl emr-worker-3:8083/connectors/connect-test/tasks
    タスクノードの詳細の表示

    タスクノードに関するエラーメッセージが返されないことを確認します。

ステップ5:移行データの生成

移行するデータをソース Kafka クラスターの connect トピックに送信します。

  1. ソースKafkaクラスターのヘッダーノードのログオンウィンドウに戻ります。 この例ではヘッダーノードは emr-header-1 です。
  2. root ユーザーとして以下のコマンドを実行してデータを connect トピックに送信します。
    kafka-producer-perf-test.sh --topic connect --num-records 100000 --throughput 5000 --record-size 1000 --producer-props bootstrap.servers=emr-header-1:9092
    移行データの生成

ステップ6:データ移行の結果表示

移行データが生成されると、Kafka Connect はターゲット Kafka クラスター内の対応するトピックにデータを自動的に移行します。 この例では、トピックは connect.replica です。

  1. ターゲット Kafka クラスターのタスクノードのログインウィンドウに戻ります。 この例では、託すノードは emr-worker-3 です。
  2. root ユーザーとして次のコマンドを実行して、データが移行されているかどうかを確認します。
    kafka-consumer-perf-test.sh --topic connect.replica --broker-list emr-header-1:9092 --messages 100000
    データ移行の結果表示

    上図に示されるコマンド出力によると、ソース Kafka クラスターに送信された 100,000 のメッセージは、ターゲット Kafka クラスターに移行されます。

概要

このページでは、Kafka Connect を使用して Kafka クラスター間でデータを移行する方法について説明します。 Kafka Connect の詳細については、『Kafka 公式ウェブサイト』および 『REST API』をご覧ください。