ストリーミングデータの処理中に、Kafka と他のシステム間のデータ同期、または Kafka クラスター間のデータ移行が必要になることがよくあります。 このページでは、Kafka
Connect を使用して、Kafka クラスター間でデータを移行する方法について説明します。
始める前に
- Alibaba Cloud アカウントに登録している必要があります。 詳細については、「Alibaba Cloud アカウントの作成」をご参照ください。
- E-MapReduce をアクティブにする必要があります。
- Alibaba Cloud アカウントの権限が付与されている必要があります。 詳細については、「役割の権限付与」をご参照ください。
このタスクについて
Kafka Connect は、Kafka と他のシステム間でストリーミングデータを高速で送信するためのスケーラブルで信頼性の高いツールです。 たとえば、Kafka
Connect を使用して、データベースから binlog データを取得し、データベースのデータを Kafka クラスターに移行できます。 この方法で、データベースのデータを移行し、データベースをダウンストリームストリーミングデータ処理システムに間接的に接続できます。
Kafka Connect は、Kafka Connect コネクターの作成と管理に役立つ Representational State Transfer (REST)
アプリケーションプログラミングインターフェイス (API) も提供します。
Kafka Connect は、スタンドアロンモードまたは分散モードで実行できます。 スタンドアロンモードでは、すべてのワーカーが同じプロセスで実行されます。 スタンドアロンモードと比較して、分散モードはよりスケーラブルでフォールトトレラントです。
最も一般的に使用されるモードです。実稼働環境用に推奨します。
このページでは、Kafka Connect の REST API を呼び出して、Kafka Connect が分散モードで実行される Kafka クラスター間でデータを移行する方法について説明します。
ステップ 1:クラスターの作成
E-MapReduce でソース Kafka クラスターとターゲット Kafkaクラスターを作成します。 Kafka Connect がタスクノードにインストールされます。
そのため、ターゲット Kafka クラスターにタスクノードを作成する必要があります。 クラスターの作成後、デフォルトで Kafka Connect はタスクノードで開始されます。
ポート番号は 8083 です。
ソース Kafka クラスターとターゲット Kafka クラスターを同じセキュリティグループに追加することをお勧めします。 ソース Kafka クラスターとターゲットKafka
クラスターが異なるセキュリティグループに属している場合、2 つのクラスターはデフォルトで相互にアクセスできません。 相互アクセスを許可するには、セキュリティグループで必要とされる設定を変更しなければなりません。
- Alibaba Cloud E-MapReduce コンソール にログインします。
- ソース Kafka クラスターとターゲット Kafka クラスターを作成します。 詳細は、「クラスターの作成」をご参照ください。
注 ターゲットKafkaクラスターを作成する場合、 タスクインスタンス 、つまりタスクノードを設定する必要があります。
ステップ2:移行するデータを保存するためのトピックの作成
ソース Kafka クラスターに connect というトピックを作成します。
- Secure Shell (SSH) を使用して、ソース Kafka クラスターのヘッダーノードにログインします。 この例では、ヘッダーノードは emr-header-1 です。
- connect というトピックを作成するために以下のコマンドをルートとして実行します。
kafka-topics.sh --create --zookeeper emr-header-1:2181 --replication-factor 2 --partitions 10 --topic connect
注 上記の操作を実行した後、後で使用するためにログインウィンドウを保持します。
ステップ3:Kafka Connect コネクターの作成
ターゲットKafka クラスターのタスクノードで、curl コマンドを実行し、JavaScript Object Notation (JSON) データを使用して Kafka Connect コネクターを作成します。
- SSH を使用して、ターゲット Kafka クラスターのタスクノードにログインします。 この例では、タスクノードは emr-worker-3 です。
- オプション:Kafka Connect の設定をカスタマイズします。
ターゲット Kafka クラスターの Kafka サービスの [設定] ページを開きます。 connect-distributed.properties の offset.storage.topic、config.storage.topic、status.storage.topic パラメータをカスタマイズします。 詳細は、「パラメーターの設定」をご参照ください。
Kafka Connect は、オフセット、設定、タスクステータスを offset.storage.topic、config.storage.topic、status.storage.topic パラメータで指定されるトピックにそれぞれ保存します。 Kafka Connect は、/etc/ecm/kafka-conf/connect-distributed.properties に保存されているデフォルトのパーティションとレプリケーション係数を使用して、これらのトピックを自動的に作成します。
- root ユーザーとして次のコマンドを実行して、Kafka Connect コネクターを作成します。
curl -X POST -H "Content-Type: application/json" --data '{"name": "connect-test", "config": { "connector.class": "EMRReplicatorSourceConnector", "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "src.kafka.bootstrap.servers": "${src-kafka-ip}:9092", "src.zookeeper.connect": "${src-kafka-curator-ip}:2181", "dest.zookeeper.connect": "${dest-kafka-curator-ip}:2181", "topic.whitelist": "${source-topic}", "topic.rename.format": "${dest-topic}", "src.kafka.max.poll.records": "300" } }' http://emr-worker-3:8083/connectors
JSON データでは、[名前] フィールドは、作成する Kafka Connect コネクター名を示します。この例では connect-test です。 config フィールドは、実際の要件に基づいて設定する必要があります。 次の表は、config フィールドの主要な変数を説明しています。
変数 |
説明 |
${source-topic} |
移行するデータをソース Kafka クラスターに保存するためのトピック。 たとえば、connect。 複数入力するときは、コンマ ( , ) で区切ります。
|
${dest-topic} |
ターゲット Kafka クラスターでデータが移行されるトピック。 たとえば、connect.replica。
|
${src-kafka-curator-hostname} |
ソース Kafka クラスターで ZooKeeper サービスがインストールされているノードの内部 IP アドレス。 |
${dest-kafka-curator-hostname} |
ZooKeeper サービスがターゲット Kafka クラスターにインストールされているノードの内部 IP アドレス。 |
注 上記の操作を実行した後、後で使用するためにログインウィンドウを保持します。
ステップ 4:Kafka Connect コネクターとタスクノードのステータスの表示
Kafka Connect コネクターとタスクノードのステータスを表示し、それらが正常なステータスであることを確認します。
- ターゲット Kafka クラスターのタスクノードのログインウィンドウに戻ります。 この例では、タスクノードは emr-worker-3 です。
- root ユーザーとして次のコマンドを実行して、すべての Kafka Connect コネクターを表示します。
curl emr-worker-3:8083/connectors
- root ユーザーとして次のコマンドを実行して、この例で作成された Kafka Connect コネクターのステータス、つまり connect-testを表示します。
curl emr-worker-3:8083/connectors/connect-test/status
Kafka Connect コネクター (この例では connect-test) は必ずRUNNING ステータスにしてください。
- rootユーザーとして次のコマンドを実行して、タスクノードの詳細を表示します。
curl emr-worker-3:8083/connectors/connect-test/tasks
タスクノードに関するエラーメッセージが返されないことを確認します。
ステップ5:移行データの生成
移行するデータをソース Kafka クラスターの connect トピックに送信します。
- ソースKafkaクラスターのヘッダーノードのログオンウィンドウに戻ります。 この例ではヘッダーノードは emr-header-1 です。
- root ユーザーとして以下のコマンドを実行してデータを connect トピックに送信します。
kafka-producer-perf-test.sh --topic connect --num-records 100000 --throughput 5000 --record-size 1000 --producer-props bootstrap.servers=emr-header-1:9092
ステップ6:データ移行の結果表示
移行データが生成されると、Kafka Connect はターゲット Kafka クラスター内の対応するトピックにデータを自動的に移行します。 この例では、トピックは
connect.replica です。
- ターゲット Kafka クラスターのタスクノードのログインウィンドウに戻ります。 この例では、託すノードは emr-worker-3 です。
- root ユーザーとして次のコマンドを実行して、データが移行されているかどうかを確認します。
kafka-consumer-perf-test.sh --topic connect.replica --broker-list emr-header-1:9092 --messages 100000
上図に示されるコマンド出力によると、ソース Kafka クラスターに送信された 100,000 のメッセージは、ターゲット Kafka クラスターに移行されます。
概要
このページでは、Kafka Connect を使用して Kafka クラスター間でデータを移行する方法について説明します。 Kafka Connect の詳細については、『Kafka 公式ウェブサイト』および 『REST API』をご覧ください。