ApsaraDB for SelectDB は、Doris Kafka Connector を使用して Kafka のデータに自動的にサブスクライブし、Kafka からデータを同期できます。このトピックでは、Doris Kafka Connector を使用して Kafka データソースから ApsaraDB for SelectDB インスタンスにデータを同期する方法について説明します。
背景情報
Kafka Connect は、Apache Kafka と他のシステム間でデータを転送するための信頼性の高いツールです。Kafka データソースとの間でデータをインポートまたはエクスポートするコネクタを定義できます。
Doris Kafka Connector は、Apache Doris によって提供され、Kafka Connect クラスタで実行されます。Doris Kafka Connector は、Kafka Topic からデータを読み取り、ApsaraDB for SelectDB に書き込むことができます。
ビジネスシナリオでは、Debezium Connector を使用してデータベース内の更新データを Kafka にプッシュしたり、API 操作を呼び出して JSON 形式のデータを Kafka にリアルタイムで書き込んだりできます。その後、Doris Kafka Connector を使用して Kafka のデータに自動的にサブスクライブし、ApsaraDB for SelectDB インスタンスにデータを同期できます。
Kafka Connect の動作モード
Kafka Connect は、スタンドアロンモードと分散モードを提供します。ビジネス要件に基づいて操作モードを使用できます。
スタンドアロンモード
本番環境ではスタンドアロンモードを使用しないことをお勧めします。
スタンドアロンモードを構成する
connect-standalone.properties ファイルを構成します。
# ブローカーの IP アドレスを変更します。
bootstrap.servers=127.0.0.1:9092Kafka 設定ディレクトリに connect-selectdb-sink.properties ファイルを作成し、ファイル内で以下の項目を設定します。
name=test-selectdb-sink
connector.class=org.apache.doris.kafka.connector.DorisSinkConnector
topics=topic_test
doris.topic2table.map=topic_test:test_kafka_tbl
buffer.count.records=10000
buffer.flush.time=120
buffer.size.bytes=5000000
doris.urls=selectdb-cn-4xl3jv1****-public.selectdbfe.rds.aliyuncs.com
doris.http.port=8030
doris.query.port=9030
doris.user=admin
doris.password=****
doris.database=test_db
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverterスタンドアロンモードを有効にする
$KAFKA_HOME/bin/connect-standalone.sh -daemon $KAFKA_HOME/config/connect-standalone.properties $KAFKA_HOME/config/connect-selectdb-sink.properties分散モード
分散モードを構成する
connect-distributed.properties ファイルを構成します。
# ブローカーの IP アドレスを変更します。
bootstrap.servers=127.0.0.1:9092
# グループ ID を変更します。グループ ID は、同じクラスタ内で一貫している必要があります。
group.id=connect-cluster分散モードを有効にする
$KAFKA_HOME/bin/connect-distributed.sh -daemon $KAFKA_HOME/config/connect-distributed.propertiesコネクタを構成する
curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X POST -d '{
"name":"test-selectdb-sink-cluster",
"config":{
"connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector",
"topics":"topic_test",
"doris.topic2table.map": "topic_test:test_kafka_tbl",
"buffer.count.records":"10000",
"buffer.flush.time":"120",
"buffer.size.bytes":"5000000",
"doris.urls":"selectdb-cn-4xl3jv1****-public.selectdbfe.rds.aliyuncs.com",
"doris.user":"admin",
"doris.password":"***",
"doris.database":"test_db",
"doris.http.port":"8080",
"doris.query.port":"9030",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter":"org.apache.kafka.connect.json.JsonConverter"
}
}'パラメーター
パラメーター | 説明 |
名前 | コネクタの名前。ほとんどの場合、名前は ISO 制御文字を含まない文字列であり、Kafka Connect クラスター内で一意である必要があります。 |
connector.class | コネクタのクラス名またはエイリアス。値を |
トピック | トピック (データソースとして機能する) の名前。複数のトピック名を指定する場合は、コンマ (,) で区切ります。 |
doris.topic2table.map | トピックとテーブル間のマッピング。複数のマッピング関係はコンマ (,) で区切ります。例: |
buffer.count.records | Kafka パーティションごとに、データが ApsaraDB for SelectDB インスタンスにフラッシュされる前にメモリにバッファリングされるデータレコードの数です。デフォルト値:10000。 |
buffer.flush.time | メモリ内のバッファーがリフレッシュされる間隔です。単位:秒。既定値:120。 |
buffer.size.bytes | Kafka パーティションごとにメモリにバッファリングされるデータレコードの累積サイズです。単位:バイト。既定値:5000000。 |
doris.urls | ApsaraDB for SelectDB インスタンスのエンドポイント。 ApsaraDB for SelectDB インスタンスのエンドポイントを取得するには、次の操作を実行します。ApsaraDB for SelectDB コンソールにログオンします。インスタンスの[インスタンスの詳細] ページに移動し、[ネットワーク情報] セクションでエンドポイントを表示します。 selectdb-cn4xl3jv1****.selectdbfe.rds.aliyuncs.com:9030 |
doris.http.ポート | ApsaraDB for SelectDB インスタンスの HTTP ポート番号。デフォルト値:8080。 |
doris.query.ポート | ApsaraDB for SelectDB インスタンスの MySQL ポート番号。デフォルト値:9030。 |
doris.user | ApsaraDB for SelectDB インスタンスに接続するために使用するユーザー名。 |
doris.password | ApsaraDB for SelectDB インスタンスに接続するために使用するユーザー名のパスワード。 |
doris データベース | ApsaraDB for SelectDB インスタンスでデータを書き込むデータベースの名前。 |
key.converter | コンバーター クラスは、JSON 形式のデータ内の特定のキーを変換するためのものです。 |
value.converter | JSON 形式のデータの特定の値を変換するためのコンバータークラスです。 |
jmx | Doris Kafka Connector を監視するために Java Management Extensions (JMX) を使用するかどうかを指定します。詳細については、「JMX を使用して Doris Kafka Connector を監視する」をご参照ください。デフォルト値:TRUE。 |
有効。削除 | 同時にレコードを削除するかどうかを指定します。既定値: false。 |
プレフィックス | Stream Load を使用してデータをインポートするときに使用される label のプレフィックス。デフォルト値は、コネクタの名前です。 |
auto.redirect | Stream Load リクエストをリダイレクトするかどうかを指定します。自動リダイレクト機能を有効にすると、Stream Load リクエストは、フロントエンド ( FE ) 経由でデータが書き込まれるバックエンド ( BE ) にリダイレクトされます。 BE 情報は表示されなくなります。 |
load.model | データのインポート モード。有効な値:
Default value: |
sink.properties.* | Stream Load を使用してデータをインポートするためのパラメーター。 たとえば、 詳細については、「Stream Load を使用してデータをインポートする」をご参照ください。 |
配信保証 | 消費された Kafka データを ApsaraDB for SelectDB にインポートする際にデータ整合性を確保するために使用されるメソッド。有効な値:
|
enable.2pc | 2 フェーズコミットモードを有効にするかどうかを指定します。 1 回限りのセマンティクスを確保するために、2 フェーズコミットモードを有効にすることができます。 |
Kafka Connect Sink の他の一般的な設定項目の詳細については、Kafka 3.7 ドキュメントの[コネクタの設定]セクションをご参照ください。
例
環境を準備する
Apache Kafka クラスター (バージョン 2.4.0 以降) または Confluent Cloud をインストールします。この例では、スタンドアロンの Kafka クラスターを使用します。
# パッケージをダウンロードして解凍します。 wget https://archive.apache.org/dist/kafka/2.4.0/kafka_2.12-2.4.0.tgz tar -zxvf kafka_2.12-2.4.0.tgz cd kafka_2.12-2.4.0/ bin/zookeeper-server-start.sh -daemon config/zookeeper.properties bin/kafka-server-start.sh -daemon config/server.propertiesdoris-kafka-connector-1.0.0.jar パッケージをダウンロードし、JAR パッケージを KAKFA_HOME/libs ディレクトリに保存します。
ApsaraDB for SelectDB インスタンスを作成します。詳細については、「インスタンスを作成する」をご参照ください。
ApsaraDB for SelectDBインスタンスへの接続 インスタンスに MySQL プロトコル経由で接続します。詳細については、「」をご参照ください。
テスト データベースとテスト テーブルを作成します。
Execute the following statement to create a test database: 次のステートメントを実行して、テストデータベースを作成します。
CREATE DATABASE test_db;Execute the following statements to create a test table: 次のステートメントを実行して、テストテーブルを作成します。
USE test_db; CREATE TABLE employees ( emp_no int NOT NULL, birth_date date, first_name varchar(20), last_name varchar(20), gender char(2), hire_date date ) UNIQUE KEY(`emp_no`) DISTRIBUTED BY HASH(`emp_no`) BUCKETS 1;
例 1: JSON 形式のデータを同期する
SelectDB シンク用の ApsaraDB を構成します。
この例では、スタンドアロンモードを使用します。 Kafka 設定ディレクトリに selectdb-sink.properties ファイルを作成し、ファイル内で以下の項目を設定します。
name=selectdb_sink connector.class=org.apache.doris.kafka.connector.DorisSinkConnector topics=test_topic doris.topic2table.map=test_topic:example_tbl buffer.count.records=10000 buffer.flush.time=120 buffer.size.bytes=5000000 doris.urls=selectdb-cn-4xl3jv1****-public.selectdbfe.rds.aliyuncs.com doris.http.port=8080 doris.query.port=9030 doris.user=admin doris.password=*** doris.database=test_db key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.json.JsonConverter # オプション。デッドレターキューを構成します。 errors.tolerance=all errors.deadletterqueue.topic.name=test_error errors.deadletterqueue.context.headers.enable = true errors.deadletterqueue.topic.replication.factor=1Kafka Connect を起動します。
bin/connect-standalone.sh -daemon config/connect-standalone.properties config/selectdb-sink.properties
例 2: Debezium Connector を使用して MySQL データベースから ApsaraDB for SelectDB インスタンスにデータを同期する
ビジネス シナリオによっては、ビジネス データベースからデータをリアルタイムで同期する必要があります。この場合、変更データキャプチャ (CDC) メカニズムが必要です。
Debezium Connector は、Kafka Connect に基づいて開発された CDC ツールです。Debezium Connector は、MySQL、PostgreSQL、SQL Server、Oracle、MongoDB など、さまざまなデータベースに接続し、データベースから Kafka Topic に統一されたフォーマットで継続的にデータを送信して、ダウンストリーム シンクがリアルタイムで消費できるようにします。この例では、MySQL データソースを使用します。
Debezium Connector パッケージをダウンロードします。
wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.9.8.Final/debezium-connector-mysql-1.9.8.Final-plugin.tar.gzダウンロードしたパッケージを解凍します。
tar -zxvf debezium-connector-mysql-1.9.8.Final-plugin.tar.gzKAKFA_HOME/libs ディレクトリにすべての抽出された JAR パッケージを格納します。
MySQL データソースを構成します。
Kafka 設定ディレクトリに mysql-source.properties ファイルを作成し、ファイル内で以下の項目を設定します。
name=mysql-source connector.class=io.debezium.connector.mysql.MySqlConnector database.hostname=rm-bp17372257wkz****.rwlb.rds.aliyuncs.com database.port=3306 database.user=testuser database.password=**** database.server.id=1 # Kafka 内のクライアントの一意の識別子。 database.server.name=test123 # データが同期されるデータベースとテーブル。デフォルトでは、すべてのデータベースとテーブルからデータが同期されます。 database.include.list=test table.include.list=test.test_table database.history.kafka.bootstrap.servers=localhost:9092 # データベースとテーブルのスキーマ変更イベントを保存するために使用される Kafka トピック。 database.history.kafka.topic=dbhistory transforms=unwrap # 詳細については、https://debezium.io/documentation/reference/stable/transformations/event-flattening.html をご覧ください。 transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState # DELETE 操作のレコード変更イベント。 transforms.unwrap.delete.handling.mode=rewrite構成が完了すると、Kafka Topic はデフォルトで
SERVER_NAME.DATABASE_NAME.TABLE_NAMEの形式で命名されます。説明詳細については、「MySQL 用 Debezium コネクタ」をご参照ください。
ApsaraDB for SelectDB シンクを構成します。
Kafka 設定ディレクトリに selectdb-sink.properties ファイルを作成し、ファイル内で以下の項目を設定します。
name=selectdb-sink connector.class=org.apache.doris.kafka.connector.DorisSinkConnector topics=test123.test.test_table doris.topic2table.map=test123.test.test_table:test_table buffer.count.records=10000 buffer.flush.time=120 buffer.size.bytes=5000000 doris.urls=selectdb-cn-4xl3jv1****-public.selectdbfe.rds.aliyuncs.com doris.http.port=8080 doris.query.port=9030 doris.user=admin doris.password=**** doris.database=test key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter # オプション。デッドレターキューを構成します。 #errors.tolerance=all #errors.deadletterqueue.topic.name=test_error #errors.deadletterqueue.context.headers.enable = true #errors.deadletterqueue.topic.replication.factor=1説明ApsaraDB for SelectDB インスタンスにデータを同期する前に、インスタンスにデータベースとテーブルを作成する必要があります。
Kafka Connect を起動します。
bin/connect-standalone.sh -daemon config/connect-standalone.properties config/mysql-source.properties config/selectdb-sink.properties説明Kafka Connect を起動した後、logs/connect.log ファイルを表示して、Kafka Connect が起動されているかどうかを確認できます。
高度な使用方法
コネクタを管理する
# コネクタのステータスをクエリします。
curl -i http://127.0.0.1:8083/connectors/test-selectdb-sink-cluster/status -X GET
# 現在のコネクタを削除します。
curl -i http://127.0.0.1:8083/connectors/test-selectdb-sink-cluster -X DELETE
# 現在のコネクタを一時停止します。
curl -i http://127.0.0.1:8083/connectors/test-selectdb-sink-cluster/pause -X PUT
# 現在のコネクタを再開します。
curl -i http://127.0.0.1:8083/connectors/test-selectdb-sink-cluster/resume -X PUT
# コネクタ内のタスクを再開します。
curl -i http://127.0.0.1:8083/connectors/test-selectdb-sink-cluster/tasks/0/restart -X POST詳細については、「Confluent Platform の Kafka Connect REST インターフェース」をご参照ください。
配信不能キューを構成する
デフォルトでは、変換プロセスまたは変換プロセス中に発生するエラーが原因で、コネクタが失敗する可能性があります。 コネクタがエラーをスキップするように構成することで、このようなエラーを許容することもできます。 エラー、失敗した操作、および異常なデータレコードの詳細は、さまざまなレベルの詳細でデッドレターキューに書き込むことができます。
errors.tolerance=all
errors.deadletterqueue.topic.name=test_error_topic
errors.deadletterqueue.context.headers.enable=true
errors.deadletterqueue.topic.replication.factor=1詳細については、「Connect のエラー報告」をご参照ください。
SSL 認証を使用して Kafka クラスターにアクセスする
Kafka Connect を使用して SSL 認証で Kafka クラスターにアクセスするには、Kafka ブローカーの公開鍵を認証するために使用される証明書ファイル client.truststore.jks を提供する必要があります。以下の構成を connect-distributed.properties ファイルに追加できます。
# Connect ワーカー
security.protocol=SSL
ssl.truststore.location=/var/ssl/private/client.truststore.jks
ssl.truststore.password=test1234
# シンクコネクタの埋め込みコンシューマー
consumer.security.protocol=SSL
consumer.ssl.truststore.location=/var/ssl/private/client.truststore.jks
consumer.ssl.truststore.password=test1234詳細については、「Kafka Connect を構成する」をご参照ください。