すべてのプロダクト
Search
ドキュメントセンター

EventBridge:Kafka Connect

最終更新日:Jan 11, 2025

Apache Kafka コネクタ用のカスタム Message Queue を使用すると、Apache RocketMQ クラスタからデータを抽出したり、Apache RocketMQ クラスタにデータをプッシュしたりできます。このトピックでは、カスタム Message Queue for Apache Kafka コネクタを使用する場合の基本用語と構成について説明します。

基本用語

コネクタ

コネクタは論理抽象化レイヤーに配置され、データソースとデータシンクを指定するために使用されます。コネクタは、データソースから Message Queue for Apache Kafka トピックにデータをコピーするか、Message Queue for Apache Kafka トピックからデータシンクにデータをコピーします。

タスク

タスクはステートレスなロジック実行単位です。各コネクタインスタンスは、データ転送のために複数のタスクを管理します。

ワーカー

ワーカーは、1 つ以上のコネクタインスタンススレッドとタスクスレッドを実行するプロセスです。Kafka Connect では、次の実行モードがサポートされています。
  • スタンドアロンモード: ワーカーは 1 つだけ起動されます。すべてのワークロードはワーカーで実行されます。このモードはフォールトトレラントではありません。
  • 分散モード: 複数のワーカーが起動されます。すべてのワーカーは同じ group.id を使用し、ワーカークラスタを構成します。Message Queue for Apache Kafka のコンシューマーグループで使用されるリバランスポリシーと同様のリバランスポリシーを使用して、コネクタを管理し、ワーカー間でタスクをスケジュールします。このモードはスケーラブルでフォールトトレラントです。ワーカーを追加または終了した場合、またはワーカーが予期せず失敗した場合、他のワーカーは変更を検出し、リバランスを実行してコネクタとタスクを再配布します。
    分散モードを使用する場合は、次の項目を構成する必要があります:
    • plugin.path: Kafka Connect の構成項目。パスは、Kafka Connect の起動時にコネクタの実行可能コンテンツをアドレス指定するために使用されます。実行可能コンテンツは、JAR または依存関係にすることができます。複数のパスを構成できます。複数のパスはコンマ (,) で区切ります。例:
      /usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors

      上記の例では、最初のディレクトリには、プラグインとプラグインのサードパーティ依存関係が依存するすべてのクラスファイルの uber-JAR が含まれています。最後のディレクトリには、すべての JAR とプラグインに必要なサードパーティ依存関係が含まれています。

    • group.id: Connect クラスタグループで使用される一意の ID。この ID は、Message Queue for Apache Kafka のコンシューマーグループの ID と同じにすることはできません。デフォルト値: connect-cluster。
    • config.storage.topic: コネクタとタスクに関する構成情報を格納するために使用されるトピック。トピックには、1 つのパーティションと複数のレプリカが含まれている必要があります。システムによって自動的に作成されるトピックには複数のパーティションが含まれている可能性があるため、トピックを手動で作成する必要があります。
    • offset.storage.topic: オフセットに関する情報を格納するために使用されるトピック。トピックには、複数のパーティションとレプリカが含まれている必要があります。デフォルト値: connect-offsets。
    • status.storage.topic: ステータスに関する情報を格納するために使用されるトピック。このトピックには、複数のパーティションとレプリカを含めることができます。デフォルト値: connect-status。

コンバーター

コンバーターは、データ形式を変換するために使用されるコンポーネントです。コンバーターを使用すると、Message Queue for Apache Kafka と他の外部サービス間でメッセージデータをシリアル化および逆シリアル化して、データ形式と構造の互換性を確保できます。コンバーターは、ワーカーとコネクタに対して構成できます。コネクタのコンバーター構成は、ワーカーのコンバーター構成を上書きできます。コンバーターを使用して、Avro、Protobuf、String、JSON、JSON Schema、ByteArray などのデータ形式間で変換できます。

コネクタの構成

重要 EventBridge コンソールと ZIP ファイルの両方でコネクタを構成した場合、コンソールの構成が ZIP ファイルの構成を上書きします。
パラメーター (必須)説明
nameコネクタの名前。ほとんどの場合、名前は ISO 制御文字を含まない文字列です。mongo-sink
connector.classコネクタのクラス名またはエイリアス。クラスは org.apache.kafka.connect.connector.Connector のサブクラスである必要があります。com.mongodb.kafka.connect.MongoSinkConnector
task.maxタスクの最大数。有効な値: 1 から Message Queue for Apache Kafka トピックの最大パーティション数まで。1
topicsMessage Queue for Apache Kafka パラメーターシンクコネクト に設定されている場合、ソース トピックを指定します。複数のトピックはコンマ (,) で区切ります。sourceA,sourceB

オプションのパラメーターについては、3.5 Kafka Connect の構成 をご参照ください。

次のコードは、データシンクが jdbc であるコネクタを構成する方法を示しています:
name=testConnector
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=2
topics=connect-test