すべてのプロダクト
Search
ドキュメントセンター

ApsaraDB for SelectDB:Doris Kafka Connector を使用してデータをインポートする

最終更新日:Apr 09, 2025

ApsaraDB for SelectDB は、Doris Kafka Connector を使用して Kafka のデータに自動的にサブスクライブし、Kafka からデータを同期できます。このトピックでは、Doris Kafka Connector を使用して Kafka データソースから ApsaraDB for SelectDB インスタンスにデータを同期する方法について説明します。

背景情報

Kafka Connect は、Apache Kafka と他のシステム間でデータを転送するための信頼性の高いツールです。Kafka データソースとの間でデータをインポートまたはエクスポートするコネクタを定義できます。

Doris Kafka Connector は、Apache Doris によって提供され、Kafka Connect クラスタで実行されます。Doris Kafka Connector は、Kafka Topic からデータを読み取り、ApsaraDB for SelectDB に書き込むことができます。

ビジネスシナリオでは、Debezium Connector を使用してデータベース内の更新データを Kafka にプッシュしたり、API 操作を呼び出して JSON 形式のデータを Kafka にリアルタイムで書き込んだりできます。その後、Doris Kafka Connector を使用して Kafka のデータに自動的にサブスクライブし、ApsaraDB for SelectDB インスタンスにデータを同期できます。

Kafka Connect の動作モード

Kafka Connect は、スタンドアロンモードと分散モードを提供します。ビジネス要件に基づいて操作モードを使用できます。

スタンドアロンモード

警告

本番環境ではスタンドアロンモードを使用しないことをお勧めします。

スタンドアロンモードを構成する

connect-standalone.properties ファイルを構成します。

# ブローカーの IP アドレスを変更します。
bootstrap.servers=127.0.0.1:9092

Kafka 設定ディレクトリに connect-selectdb-sink.properties ファイルを作成し、ファイル内で以下の項目を設定します。

name=test-selectdb-sink
connector.class=org.apache.doris.kafka.connector.DorisSinkConnector
topics=topic_test
doris.topic2table.map=topic_test:test_kafka_tbl
buffer.count.records=10000
buffer.flush.time=120
buffer.size.bytes=5000000
doris.urls=selectdb-cn-4xl3jv1****-public.selectdbfe.rds.aliyuncs.com
doris.http.port=8030
doris.query.port=9030
doris.user=admin
doris.password=****
doris.database=test_db
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

スタンドアロンモードを有効にする

$KAFKA_HOME/bin/connect-standalone.sh -daemon $KAFKA_HOME/config/connect-standalone.properties $KAFKA_HOME/config/connect-selectdb-sink.properties

分散モード

分散モードを構成する

connect-distributed.properties ファイルを構成します。

# ブローカーの IP アドレスを変更します。
bootstrap.servers=127.0.0.1:9092

# グループ ID を変更します。グループ ID は、同じクラスタ内で一貫している必要があります。
group.id=connect-cluster

分散モードを有効にする

$KAFKA_HOME/bin/connect-distributed.sh -daemon $KAFKA_HOME/config/connect-distributed.properties

コネクタを構成する

curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X POST -d '{
  "name":"test-selectdb-sink-cluster",
  "config":{
    "connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector",
    "topics":"topic_test",
    "doris.topic2table.map": "topic_test:test_kafka_tbl",
    "buffer.count.records":"10000",
    "buffer.flush.time":"120",
    "buffer.size.bytes":"5000000",
    "doris.urls":"selectdb-cn-4xl3jv1****-public.selectdbfe.rds.aliyuncs.com",
    "doris.user":"admin",
    "doris.password":"***",
    "doris.database":"test_db",
    "doris.http.port":"8080",
    "doris.query.port":"9030",
    "key.converter":"org.apache.kafka.connect.storage.StringConverter",
    "value.converter":"org.apache.kafka.connect.json.JsonConverter"
  }
}'

パラメーター

パラメーター

説明

名前

コネクタの名前。ほとんどの場合、名前は ISO 制御文字を含まない文字列であり、Kafka Connect クラスター内で一意である必要があります。

connector.class

コネクタのクラス名またはエイリアス。値を com.selectdb.kafka.connector.SelectdbSinkConnector に設定します。

トピック

トピック (データソースとして機能する) の名前。複数のトピック名を指定する場合は、コンマ (,) で区切ります。

doris.topic2table.map

トピックとテーブル間のマッピング。複数のマッピング関係はコンマ (,) で区切ります。例: topic1:tb1,topic2:tb2。デフォルトでは、このパラメーターは空のままです。これは、トピックとテーブルの名前が同じであることを示します。

buffer.count.records

Kafka パーティションごとに、データが ApsaraDB for SelectDB インスタンスにフラッシュされる前にメモリにバッファリングされるデータレコードの数です。デフォルト値:10000。

buffer.flush.time

メモリ内のバッファーがリフレッシュされる間隔です。単位:秒。既定値:120。

buffer.size.bytes

Kafka パーティションごとにメモリにバッファリングされるデータレコードの累積サイズです。単位:バイト。既定値:5000000。

doris.urls

ApsaraDB for SelectDB インスタンスのエンドポイント。

ApsaraDB for SelectDB インスタンスのエンドポイントを取得するには、次の操作を実行します。ApsaraDB for SelectDB コンソールにログオンします。インスタンスの[インスタンスの詳細] ページに移動し、[ネットワーク情報] セクションでエンドポイントを表示します。

selectdb-cn4xl3jv1****.selectdbfe.rds.aliyuncs.com:9030

doris.http.ポート

ApsaraDB for SelectDB インスタンスの HTTP ポート番号。デフォルト値:8080。

doris.query.ポート

ApsaraDB for SelectDB インスタンスの MySQL ポート番号。デフォルト値:9030。

doris.user

ApsaraDB for SelectDB インスタンスに接続するために使用するユーザー名。

doris.password

ApsaraDB for SelectDB インスタンスに接続するために使用するユーザー名のパスワード。

doris データベース

ApsaraDB for SelectDB インスタンスでデータを書き込むデータベースの名前。

key.converter

コンバーター クラスは、JSON 形式のデータ内の特定のキーを変換するためのものです。

value.converter

JSON 形式のデータの特定の値を変換するためのコンバータークラスです。

jmx

Doris Kafka Connector を監視するために Java Management Extensions (JMX) を使用するかどうかを指定します。詳細については、「JMX を使用して Doris Kafka Connector を監視する」をご参照ください。デフォルト値:TRUE。

有効。削除

同時にレコードを削除するかどうかを指定します。既定値: false。

プレフィックス

Stream Load を使用してデータをインポートするときに使用される label のプレフィックス。デフォルト値は、コネクタの名前です。

auto.redirect

Stream Load リクエストをリダイレクトするかどうかを指定します。自動リダイレクト機能を有効にすると、Stream Load リクエストは、フロントエンド ( FE ) 経由でデータが書き込まれるバックエンド ( BE ) にリダイレクトされます。 BE 情報は表示されなくなります。

load.model

データのインポート モード。有効な値:

  • stream_load: データは ApsaraDB for SelectDB に直接インポートされます。

  • copy_into

Default value: stream_load です。

sink.properties.*

Stream Load を使用してデータをインポートするためのパラメーター。

たとえば、sink.properties.column_separator パラメーターは、列区切り文字を指定します。

詳細については、「Stream Load を使用してデータをインポートする」をご参照ください。

配信保証

消費された Kafka データを ApsaraDB for SelectDB にインポートする際にデータ整合性を確保するために使用されるメソッド。有効な値:at_least_once および exactly_once。デフォルト値:at_least_once

exactly_once にこのパラメーターを設定できるのは、load.model パラメーターを copy_into に設定した場合のみです。ApsaraDB for SelectDB

enable.2pc

2 フェーズコミットモードを有効にするかどうかを指定します。 1 回限りのセマンティクスを確保するために、2 フェーズコミットモードを有効にすることができます。

説明

Kafka Connect Sink の他の一般的な設定項目の詳細については、Kafka 3.7 ドキュメントの[コネクタの設定]セクションをご参照ください。

環境を準備する

  1. Apache Kafka クラスター (バージョン 2.4.0 以降) または Confluent Cloud をインストールします。この例では、スタンドアロンの Kafka クラスターを使用します。

    # パッケージをダウンロードして解凍します。
    wget https://archive.apache.org/dist/kafka/2.4.0/kafka_2.12-2.4.0.tgz
    tar -zxvf kafka_2.12-2.4.0.tgz
    cd kafka_2.12-2.4.0/
    bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
    bin/kafka-server-start.sh -daemon config/server.properties
  2. doris-kafka-connector-1.0.0.jar パッケージをダウンロードし、JAR パッケージを KAKFA_HOME/libs ディレクトリに保存します。

  3. ApsaraDB for SelectDB インスタンスを作成します。詳細については、「インスタンスを作成する」をご参照ください。

  4. ApsaraDB for SelectDBインスタンスへの接続 インスタンスに MySQL プロトコル経由で接続します。詳細については、「」をご参照ください。

  5. テスト データベースとテスト テーブルを作成します。

    1. Execute the following statement to create a test database: 次のステートメントを実行して、テストデータベースを作成します。

      CREATE DATABASE test_db;
    2. Execute the following statements to create a test table: 次のステートメントを実行して、テストテーブルを作成します。

      USE test_db;
      CREATE TABLE employees (
          emp_no       int NOT NULL,
          birth_date   date,
          first_name   varchar(20),
          last_name    varchar(20),
          gender       char(2),
          hire_date    date
      )
      UNIQUE KEY(`emp_no`)
      DISTRIBUTED BY HASH(`emp_no`) BUCKETS 1;

例 1: JSON 形式のデータを同期する

  1. SelectDB シンク用の ApsaraDB を構成します。

    この例では、スタンドアロンモードを使用します。 Kafka 設定ディレクトリに selectdb-sink.properties ファイルを作成し、ファイル内で以下の項目を設定します。

    name=selectdb_sink
    connector.class=org.apache.doris.kafka.connector.DorisSinkConnector
    topics=test_topic
    doris.topic2table.map=test_topic:example_tbl
    buffer.count.records=10000
    buffer.flush.time=120
    buffer.size.bytes=5000000
    doris.urls=selectdb-cn-4xl3jv1****-public.selectdbfe.rds.aliyuncs.com
    doris.http.port=8080
    doris.query.port=9030
    doris.user=admin
    doris.password=***
    doris.database=test_db
    key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter
    
    # オプション。デッドレターキューを構成します。
    errors.tolerance=all
    errors.deadletterqueue.topic.name=test_error
    errors.deadletterqueue.context.headers.enable = true
    errors.deadletterqueue.topic.replication.factor=1
  2. Kafka Connect を起動します。

    bin/connect-standalone.sh -daemon config/connect-standalone.properties config/selectdb-sink.properties

例 2: Debezium Connector を使用して MySQL データベースから ApsaraDB for SelectDB インスタンスにデータを同期する

ビジネス シナリオによっては、ビジネス データベースからデータをリアルタイムで同期する必要があります。この場合、変更データキャプチャ (CDC) メカニズムが必要です。

Debezium Connector は、Kafka Connect に基づいて開発された CDC ツールです。Debezium Connector は、MySQL、PostgreSQL、SQL Server、Oracle、MongoDB など、さまざまなデータベースに接続し、データベースから Kafka Topic に統一されたフォーマットで継続的にデータを送信して、ダウンストリーム シンクがリアルタイムで消費できるようにします。この例では、MySQL データソースを使用します。

  1. Debezium Connector パッケージをダウンロードします。

    wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.9.8.Final/debezium-connector-mysql-1.9.8.Final-plugin.tar.gz
  2. ダウンロードしたパッケージを解凍します。

    tar -zxvf debezium-connector-mysql-1.9.8.Final-plugin.tar.gz
  3. KAKFA_HOME/libs ディレクトリにすべての抽出された JAR パッケージを格納します。

  4. MySQL データソースを構成します。

    Kafka 設定ディレクトリに mysql-source.properties ファイルを作成し、ファイル内で以下の項目を設定します。

    name=mysql-source
    connector.class=io.debezium.connector.mysql.MySqlConnector
    database.hostname=rm-bp17372257wkz****.rwlb.rds.aliyuncs.com
    database.port=3306
    database.user=testuser
    database.password=****
    database.server.id=1
    # Kafka 内のクライアントの一意の識別子。
    database.server.name=test123
    # データが同期されるデータベースとテーブル。デフォルトでは、すべてのデータベースとテーブルからデータが同期されます。
    database.include.list=test
    table.include.list=test.test_table
    database.history.kafka.bootstrap.servers=localhost:9092
    # データベースとテーブルのスキーマ変更イベントを保存するために使用される Kafka トピック。
    database.history.kafka.topic=dbhistory
    transforms=unwrap
    # 詳細については、https://debezium.io/documentation/reference/stable/transformations/event-flattening.html をご覧ください。
    transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
    # DELETE 操作のレコード変更イベント。
    transforms.unwrap.delete.handling.mode=rewrite

    構成が完了すると、Kafka Topic はデフォルトで SERVER_NAME.DATABASE_NAME.TABLE_NAME の形式で命名されます。

    説明

    詳細については、「MySQL 用 Debezium コネクタ」をご参照ください。

  5. ApsaraDB for SelectDB シンクを構成します。

    Kafka 設定ディレクトリに selectdb-sink.properties ファイルを作成し、ファイル内で以下の項目を設定します。

    name=selectdb-sink
    connector.class=org.apache.doris.kafka.connector.DorisSinkConnector
    topics=test123.test.test_table
    doris.topic2table.map=test123.test.test_table:test_table
    buffer.count.records=10000
    buffer.flush.time=120
    buffer.size.bytes=5000000
    doris.urls=selectdb-cn-4xl3jv1****-public.selectdbfe.rds.aliyuncs.com
    doris.http.port=8080
    doris.query.port=9030
    doris.user=admin
    doris.password=****
    doris.database=test
    key.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter
    
    # オプション。デッドレターキューを構成します。
    #errors.tolerance=all
    #errors.deadletterqueue.topic.name=test_error
    #errors.deadletterqueue.context.headers.enable = true
    #errors.deadletterqueue.topic.replication.factor=1
    説明

    ApsaraDB for SelectDB インスタンスにデータを同期する前に、インスタンスにデータベースとテーブルを作成する必要があります。

  6. Kafka Connect を起動します。

    bin/connect-standalone.sh -daemon config/connect-standalone.properties config/mysql-source.properties config/selectdb-sink.properties
    説明

    Kafka Connect を起動した後、logs/connect.log ファイルを表示して、Kafka Connect が起動されているかどうかを確認できます。

高度な使用方法

コネクタを管理する

# コネクタのステータスをクエリします。
curl -i http://127.0.0.1:8083/connectors/test-selectdb-sink-cluster/status -X GET
# 現在のコネクタを削除します。
curl -i http://127.0.0.1:8083/connectors/test-selectdb-sink-cluster -X DELETE
# 現在のコネクタを一時停止します。
curl -i http://127.0.0.1:8083/connectors/test-selectdb-sink-cluster/pause -X PUT
# 現在のコネクタを再開します。
curl -i http://127.0.0.1:8083/connectors/test-selectdb-sink-cluster/resume -X PUT
# コネクタ内のタスクを再開します。
curl -i http://127.0.0.1:8083/connectors/test-selectdb-sink-cluster/tasks/0/restart -X POST

詳細については、「Confluent Platform の Kafka Connect REST インターフェース」をご参照ください。

配信不能キューを構成する

デフォルトでは、変換プロセスまたは変換プロセス中に発生するエラーが原因で、コネクタが失敗する可能性があります。 コネクタがエラーをスキップするように構成することで、このようなエラーを許容することもできます。 エラー、失敗した操作、および異常なデータレコードの詳細は、さまざまなレベルの詳細でデッドレターキューに書き込むことができます。

errors.tolerance=all
errors.deadletterqueue.topic.name=test_error_topic
errors.deadletterqueue.context.headers.enable=true
errors.deadletterqueue.topic.replication.factor=1

詳細については、「Connect のエラー報告」をご参照ください。

SSL 認証を使用して Kafka クラスターにアクセスする

Kafka Connect を使用して SSL 認証で Kafka クラスターにアクセスするには、Kafka ブローカーの公開鍵を認証するために使用される証明書ファイル client.truststore.jks を提供する必要があります。以下の構成を connect-distributed.properties ファイルに追加できます。

# Connect ワーカー
security.protocol=SSL
ssl.truststore.location=/var/ssl/private/client.truststore.jks
ssl.truststore.password=test1234

# シンクコネクタの埋め込みコンシューマー
consumer.security.protocol=SSL
consumer.ssl.truststore.location=/var/ssl/private/client.truststore.jks
consumer.ssl.truststore.password=test1234

詳細については、「Kafka Connect を構成する」をご参照ください。