このトピックでは、Kafka サービスを含む E-MapReduce (EMR) クラスタを作成し、Kafka トピックと Kafka Connect サービスを使用する方法について説明します。EMR Kafka クラスタを使い始めるのに役立ちます。
注意事項
EMR Kafka クラスタを作成する場合は、Elastic Compute Service (ECS) インスタンスタイプを選択し、ビジネスの評価負荷に基づいてブローカーの数を確認する必要があります。ビジネスシナリオは多岐にわたるため、一般的なクラスタプランは提供できません。実際の環境に基づいてクラスタを作成する必要があります。ほとんどの場合、インスタンスタイプを選択する際には、次の項目を考慮することをお勧めします。
CPU とメモリの比率が 1:4 の ECS インスタンスに Kafka ブローカーをデプロイします。
クラウドディスクを使用してデータを保存します。
クラウドディスクの I/O スループットとネットワークインターフェースカード (NIC) 帯域幅の関係を考慮します。
デプロイメントパラメータを設定する際には、次の要素を考慮してください。
EMR で使用される Kafka のバージョンは、ZooKeeper サービスによって異なります。ZooKeeper の可用性によって、Kafka サービスの高可用性が決まります。クラスタを作成するときは、高可用性を有効にすることをお勧めします。クラスタの作成時に高可用性を有効にすると、ZooKeeper サービス用に 3 つのノードがデプロイされます。
マスターノードグループが ZooKeeper のデプロイにのみ使用される場合は、マスターノードグループに 1 つのデータディスクのみを設定する必要があります。
評価ベースの提案の詳細については、「クラスタリソースを評価するための提案」をご参照ください。
EMR Kafka クラスタを作成する
このセクションでは、Kafka クラスタを作成する方法について説明します。詳細については、「クラスタの作成」をご参照ください。
クラスタ作成ページに移動します。
EMR コンソール にログオンします。左側のナビゲーションウィンドウで、[ECS 上の EMR] をクリックします。
[ECS 上の EMR] ページで、[クラスタの作成] をクリックします。
[ソフトウェア設定] ステップで、必要な Kafka バージョンに基づいて EMR バージョンを選択します。
[高可用性] をオンにして、ZooKeeper サービス用に 3 つのノードをデプロイできるようにします。
重要クラスタの作成時に高可用性をオンにすると、ZooKeeper サービス用にマスターノードグループに 3 つのノードがデプロイされます。EMR で使用される Kafka のバージョンは、ZooKeeper サービスによって異なります。クラスタを作成するときは、高可用性をオンにすることをお勧めします。
[ハードウェア設定] ステップで、Elastic Compute Service (ECS) インスタンスの適切な仕様とノードの数を選択します。
仕様: コアノードグループには、CPU とメモリの比率が 1:4 のインスタンスタイプを選択します。
ノード数: コアノードグループのノード数を、Kafka パーティションレプリカの数に 1 を加えた数に設定します。これにより、リソースの冗長性が十分に確保されます。たとえば、計画されているレプリカの数が 3 つである場合は、ノードの数を 4 に設定します。
[基本設定] ステップで、ビジネス要件に基づいてパラメータを設定します。
Kafka トピックを使用する
このセクションでは、Kafka トピックを使用してデータを作成および使用する方法について説明します。実際のビジネスシナリオでは、Kafka Manager や Cruise Control などのソフトウェアを使用してクラスタを管理することもできます。
SSH モードで Kafka クラスタのマスターノードにログオンします。詳細については、「クラスタへのログオン」をご参照ください。
次のコマンドを実行して、Kafka トピックを作成します。
sudo su - kafka kafka-topics.sh --partitions 10 --replication-factor 2 --bootstrap-server core-1-1:9092 --topic test --create
次のコマンドを実行して、Kafka トピックの詳細を表示します。
kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test --describe
次のコマンドを実行して、データを作成します。
kafka-console-producer.sh --broker-list core-1-1:9092 --topic test
データが作成された後、必要な情報を入力し、Enter キーを押してトピックにデータを送信できます。
新しい端末ウィンドウを開き、次のコマンドを実行してデータを使用します。
kafka-console-consumer.sh --bootstrap-server core-1-1:9092 --topic test --from-beginning --group test-consumer-group
Kafka Connect サービスを使用する
KafkaConnect コンポーネントは、EMR V3.41.0 以降または EMR V5.7.0 以降のマイナーバージョンでのみデプロイできます。このセクションでは、Kafka Connect サービスを使用する方法について説明します。
[ノード] タブに移動します。
EMR コンソール にログオンします。左側のナビゲーションウィンドウで、[ECS 上の EMR] をクリックします。
上部のナビゲーションバーで、クラスタが存在するリージョンを選択し、ビジネス要件に基づいてリソースグループを選択します。
[ECS 上の EMR] ページで、スケールアウトするクラスタを見つけ、[アクション] 列の [ノード] をクリックします。
KafkaConnect コンポーネントをデプロイするためのノードグループを作成します。
KafkaConnect コンポーネントは、EMR タスクノードグループにデプロイされます。EMR Kafka クラスタにタスクノードグループを作成すると、EMR はノードグループに Kafka Connect クラスタを自動的に作成します。
EMR タスクノードグループを作成します。
クラスタ詳細ページの [ノード] タブで、[ノードグループの追加] をクリックして、タスクノードグループを作成します。詳細については、「ノードグループを作成する」セクションの「ノードグループの管理」Topic をご参照ください。
タスクノードグループをスケールアウトします。
ビジネス要件に基づいて、タスクノードグループの ECS インスタンスの数を増やすことができます。詳細については、「EMR クラスタのスケールアウト」をご参照ください。
KafkaConnect コンポーネントのステータスを確認します。Kafka Connect クラスタが起動していることを確認します。
Kafka サービスページの [ステータス] タブに移動します。[コンポーネント] セクションで、[kafka Connect] コンポーネントのステータスを表示します。コンポーネントが実行されていることを確認します。
Kafka Connect Rest サービスのステータスを確認します。
SSH モードで Kafka クラスタのマスターノードにログオンします。詳細については、「クラスタへのログオン」をご参照ください。
次のコマンドを実行して、Kafka Connect Rest サービスのステータスを確認します。
curl -X GET http://task-1-1:8083| jq .
次のような出力が返されます。
% Total % Received % Xferd Average Speed Time Time Time Current Dload Upload Total Spent Left Speed 100 91 100 91 0 0 13407 0 --:--:-- --:--:-- --:--:-- 15166 { "version": "2.4.1", "commit": "42ce056344c5625a", "kafka_cluster_id": "6Z7IdHW4SVO1Pbql4c****" }
Kafka Connect サービスを使用してデータを移行します。
Kafka クラスタで MirrorMaker 2.0 (MM2) タスクを開始して、データを複製および移行できます。詳細については、「Kafka MM2 を使用してクラスタ間でデータを同期する」をご参照ください。
参照資料
Secure Sockets Layer (SSL) を有効にし、SSL を使用して Kafka に接続する方法については、「SSL を使用して Kafka データを暗号化する」をご参照ください。
簡易認証およびセキュリティ層 (SASL) を設定し、SASL を使用して Kafka に接続する方法については、「SASL を使用して Kafka クラスタにログオンする」をご参照ください。