All Products
Search
Document Center

ApsaraDB for SelectDB:Use Doris Kafka Connector to import data

Last Updated:Mar 28, 2026

Doris Kafka Connector lets ApsaraDB for SelectDB automatically subscribe to and consume data from Kafka topics. Once configured, it continuously reads records from the topics you specify and writes them to your SelectDB instance — no polling or manual triggering required.

Common ingestion patterns:

  • JSON data: Applications call API operations to write JSON records directly to Kafka topics. The connector consumes these records and writes them to SelectDB in near-real time.

  • Database change data capture (CDC): Debezium Connector captures row-level changes from MySQL, PostgreSQL, SQL Server, Oracle, or MongoDB and publishes them to Kafka topics in a unified format. The connector then writes those changes — including inserts, updates, and deletes — to SelectDB.

Compatibility

Connector versionKafka versionSelectDB versionJava version
1.0.02.4 or laterAll supported8 or later

Prerequisites

Before you begin, make sure you have:

  • An Apache Kafka cluster (version 2.4 or later) or a Confluent Cloud cluster

  • An ApsaraDB for SelectDB instance. See Create an instance

  • A database and table in the instance to receive data. See Connect to an instance

  • The doris-kafka-connector-1.0.0.jar file downloaded from Maven Central

Set up the connector

Kafka Connect supports two deployment modes: standalone and distributed.

Warning

Standalone mode is for development and testing only. Use distributed mode in production.

Standalone mode

Step 1: Install the connector plugin

Place the JAR file in the $KAFKA_HOME/libs directory:

cp doris-kafka-connector-1.0.0.jar $KAFKA_HOME/libs/

Step 2: Configure Kafka Connect

Edit $KAFKA_HOME/config/connect-standalone.properties:

# Broker address
bootstrap.servers=127.0.0.1:9092

# Path to the connector plugin directory
plugin.path=$KAFKA_HOME/libs

# Increase poll interval to prevent consumer group eviction during Stream Load writes.
# Stream Load can take longer than the default 5-minute limit, causing the consumer to be
# kicked out of the group. Set this to at least 30 minutes (1,800,000 ms).
max.poll.interval.ms=1800000
consumer.max.poll.interval.ms=1800000

Step 3: Create a connector configuration file

Create $KAFKA_HOME/config/connect-selectdb-sink.properties:

name=test-selectdb-sink
connector.class=org.apache.doris.kafka.connector.DorisSinkConnector
topics=topic_test
doris.topic2table.map=topic_test:test_kafka_tbl
buffer.count.records=10000
buffer.flush.time=120
buffer.size.bytes=5000000
doris.urls=selectdb-cn-4xl3jv1****-public.selectdbfe.rds.aliyuncs.com
doris.http.port=8080
doris.query.port=9030
doris.user=admin
doris.password=****
doris.database=test_db
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

Step 4: Start Kafka Connect

$KAFKA_HOME/bin/connect-standalone.sh -daemon \
  $KAFKA_HOME/config/connect-standalone.properties \
  $KAFKA_HOME/config/connect-selectdb-sink.properties

Distributed mode

Distributed mode runs Kafka Connect as a cluster and manages connector configurations via REST API, making it suitable for production deployments.

Step 1: Install the connector plugin

Copy the JAR file to the $KAFKA_HOME/libs directory on each Kafka Connect worker node:

cp doris-kafka-connector-1.0.0.jar $KAFKA_HOME/libs/

Step 2: Configure Kafka Connect

Edit $KAFKA_HOME/config/connect-distributed.properties on each worker:

# Broker address
bootstrap.servers=127.0.0.1:9092

# Cluster identifier — must be the same on all workers in the cluster
group.id=connect-cluster

# Path to the connector plugin directory
plugin.path=$KAFKA_HOME/libs

# Increase poll interval to prevent consumer group eviction during Stream Load writes
max.poll.interval.ms=1800000
consumer.max.poll.interval.ms=1800000

Step 3: Start Kafka Connect

$KAFKA_HOME/bin/connect-distributed.sh -daemon \
  $KAFKA_HOME/config/connect-distributed.properties

Step 4: Create a connector via the REST API

curl -i http://127.0.0.1:8083/connectors \
  -H "Content-Type: application/json" \
  -X POST \
  -d '{
    "name": "test-selectdb-sink-cluster",
    "config": {
      "connector.class": "org.apache.doris.kafka.connector.DorisSinkConnector",
      "topics": "topic_test",
      "doris.topic2table.map": "topic_test:test_kafka_tbl",
      "buffer.count.records": "10000",
      "buffer.flush.time": "120",
      "buffer.size.bytes": "5000000",
      "doris.urls": "selectdb-cn-4xl3jv1****-public.selectdbfe.rds.aliyuncs.com",
      "doris.http.port": "8080",
      "doris.query.port": "9030",
      "doris.user": "admin",
      "doris.password": "***",
      "doris.database": "test_db",
      "key.converter": "org.apache.kafka.connect.storage.StringConverter",
      "value.converter": "org.apache.kafka.connect.json.JsonConverter"
    }
  }'

Parameters

Required parameters

ParameterDescription
nameConnector name. Must be unique in the Kafka Connect cluster. Cannot contain ISO control characters.
connector.classSet to org.apache.doris.kafka.connector.DorisSinkConnector.
topicsKafka topics to consume from. Separate multiple topics with commas.
doris.urlsEndpoint of your ApsaraDB for SelectDB instance. Find it on the Instance Details page under Network Information. Example: selectdb-cn-4xl3jv1****-public.selectdbfe.rds.aliyuncs.com
doris.userUsername for connecting to the instance.
doris.passwordPassword for the username.
doris.databaseTarget database in the instance.

Buffer parameters

ParameterDefaultDescription
buffer.count.records10000Number of records buffered per Kafka partition before flushing to SelectDB.
buffer.flush.time120Buffer flush interval, in seconds.
buffer.size.bytes5000000Maximum buffer size per Kafka partition, in bytes.

Port parameters

ParameterDefaultDescription
doris.http.port8080HTTP port of the SelectDB instance. Used for Stream Load writes.
doris.query.port9030MySQL-compatible query port of the SelectDB instance.

Converter parameters

ParameterRequiredDescription
key.converterYesConverter class for message keys. Use org.apache.kafka.connect.storage.StringConverter for string keys or org.apache.kafka.connect.json.JsonConverter for JSON keys.
value.converterYesConverter class for message values. Use org.apache.kafka.connect.json.JsonConverter for JSON-formatted values.

Advanced parameters

ParameterDefaultValid valuesDescription
doris.topic2table.map(same as topic name)topic1:table1,topic2:table2Maps Kafka topics to SelectDB tables. If left blank, the topic name is used as the table name.
load.modelstream_loadstream_load, copy_intoLoad method. stream_load writes directly to SelectDB. copy_into stages data in OSS first, then loads it — required for exactly_once delivery.
delivery.guaranteeat_least_onceat_least_once, exactly_onceData consistency guarantee. Set to exactly_once only when load.model=copy_into.
enable.2pctrue, falseEnables two-phase commit for exactly-once semantics.
enable.deletefalsetrue, falseDeletes rows in SelectDB when the source record is deleted in Kafka.
label.prefix(connector name)Any stringLabel prefix for Stream Load imports.
auto.redirecttrue, falseRedirects Stream Load requests directly to the backend (BE) node handling the write, bypassing the frontend (FE).
sink.properties.*Stream Load parameters. For example, sink.properties.column_separator sets the column delimiter. See Import data by using Stream Load.
jmxtruetrue, falseEnables Java Management Extensions (JMX) monitoring. See Use JMX to monitor Doris Kafka Connector.
For a full list of Kafka Connect Sink configuration options, see Configuring connectors in the Kafka 3.7 documentation.

Examples

Example 1: Import JSON data

This example sets up a standalone connector that reads JSON records from a Kafka topic and writes them to a SelectDB table.

Prepare the environment

  1. Start a Kafka cluster (version 2.4.0 or later). This example uses a standalone Kafka cluster:

    wget https://archive.apache.org/dist/kafka/2.4.0/kafka_2.12-2.4.0.tgz
    tar -zxvf kafka_2.12-2.4.0.tgz
    cd kafka_2.12-2.4.0/
    bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
    bin/kafka-server-start.sh -daemon config/server.properties
  2. Install the connector plugin:

    cp doris-kafka-connector-1.0.0.jar $KAFKA_HOME/libs/
  3. Create a test database and table in your SelectDB instance:

    CREATE DATABASE test_db;
    USE test_db;
    CREATE TABLE employees (
        emp_no     INT          NOT NULL,
        birth_date DATE,
        first_name VARCHAR(20),
        last_name  VARCHAR(20),
        gender     CHAR(2),
        hire_date  DATE
    )
    UNIQUE KEY(`emp_no`)
    DISTRIBUTED BY HASH(`emp_no`) BUCKETS 1;

Configure and start the connector

Create $KAFKA_HOME/config/selectdb-sink.properties:

name=selectdb_sink
connector.class=org.apache.doris.kafka.connector.DorisSinkConnector
topics=test_topic
doris.topic2table.map=test_topic:employees
buffer.count.records=10000
buffer.flush.time=120
buffer.size.bytes=5000000
doris.urls=selectdb-cn-4xl3jv1****-public.selectdbfe.rds.aliyuncs.com
doris.http.port=8080
doris.query.port=9030
doris.user=admin
doris.password=***
doris.database=test_db
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

# Optional: route failed records to a dead-letter queue instead of stopping the connector
errors.tolerance=all
errors.deadletterqueue.topic.name=test_error
errors.deadletterqueue.context.headers.enable=true
errors.deadletterqueue.topic.replication.factor=1

Start Kafka Connect:

bin/connect-standalone.sh -daemon \
  config/connect-standalone.properties \
  config/selectdb-sink.properties

Verify the import

Produce a few test records to the topic:

bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test_topic
> {"emp_no":1,"birth_date":"1990-01-01","first_name":"Alice","last_name":"Smith","gender":"F","hire_date":"2020-06-15"}
> {"emp_no":2,"birth_date":"1985-03-22","first_name":"Bob","last_name":"Jones","gender":"M","hire_date":"2019-11-01"}

After the buffer flushes (within buffer.flush.time seconds), query the table in SelectDB:

SELECT * FROM test_db.employees;

Expected output:

+--------+------------+------------+-----------+--------+------------+
| emp_no | birth_date | first_name | last_name | gender | hire_date  |
+--------+------------+------------+-----------+--------+------------+
|      1 | 1990-01-01 | Alice      | Smith     | F      | 2020-06-15 |
|      2 | 1985-03-22 | Bob        | Jones     | M      | 2019-11-01 |
+--------+------------+------------+-----------+--------+------------+

Example 2: Sync MySQL data using Debezium Connector

Use Debezium Connector to capture changes from a MySQL database and stream them to SelectDB in real time. Debezium publishes each row-level change — insert, update, or delete — to a Kafka topic. The SelectDB connector then consumes those events and applies them to the target table.

Create the target database and table in your SelectDB instance before starting the sync.

Step 1: Download and install Debezium Connector

wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.9.8.Final/debezium-connector-mysql-1.9.8.Final-plugin.tar.gz
tar -zxvf debezium-connector-mysql-1.9.8.Final-plugin.tar.gz

# Copy all extracted JARs to the libs directory
cp debezium-connector-mysql/*.jar $KAFKA_HOME/libs/

Step 2: Configure the MySQL source connector

Create $KAFKA_HOME/config/mysql-source.properties:

name=mysql-source
connector.class=io.debezium.connector.mysql.MySqlConnector
database.hostname=rm-bp17372257wkz****.rwlb.rds.aliyuncs.com
database.port=3306
database.user=testuser
database.password=****
database.server.id=1
# Unique identifier for this connector in Kafka
database.server.name=test123
# Databases and tables to sync (default: all)
database.include.list=test
table.include.list=test.test_table
database.history.kafka.bootstrap.servers=localhost:9092
# Kafka topic for schema change history
database.history.kafka.topic=dbhistory
transforms=unwrap
# Flatten CDC events to simple key-value records
# See https://debezium.io/documentation/reference/stable/transformations/event-flattening.html
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
# Include deleted row state for DELETE events
transforms.unwrap.delete.handling.mode=rewrite

After starting, Debezium names topics using the format SERVER_NAME.DATABASE_NAME.TABLE_NAME — in this example, test123.test.test_table.

For the full Debezium MySQL connector configuration reference, see Debezium connector for MySQL.

Step 3: Configure the SelectDB sink connector

Create $KAFKA_HOME/config/selectdb-sink.properties:

name=selectdb-sink
connector.class=org.apache.doris.kafka.connector.DorisSinkConnector
topics=test123.test.test_table
doris.topic2table.map=test123.test.test_table:test_table
buffer.count.records=10000
buffer.flush.time=120
buffer.size.bytes=5000000
doris.urls=selectdb-cn-4xl3jv1****-public.selectdbfe.rds.aliyuncs.com
doris.http.port=8080
doris.query.port=9030
doris.user=admin
doris.password=****
doris.database=test
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

# Optional: dead-letter queue for failed records
#errors.tolerance=all
#errors.deadletterqueue.topic.name=test_error
#errors.deadletterqueue.context.headers.enable=true
#errors.deadletterqueue.topic.replication.factor=1

Step 4: Start Kafka Connect

bin/connect-standalone.sh -daemon \
  config/connect-standalone.properties \
  config/mysql-source.properties \
  config/selectdb-sink.properties

Check logs/connect.log to confirm Kafka Connect started successfully.

Manage connectors

In distributed mode, use the Kafka Connect REST API to manage connector lifecycle:

# Check connector status
curl -i http://127.0.0.1:8083/connectors/test-selectdb-sink-cluster/status -X GET

# Pause a connector
curl -i http://127.0.0.1:8083/connectors/test-selectdb-sink-cluster/pause -X PUT

# Resume a paused connector
curl -i http://127.0.0.1:8083/connectors/test-selectdb-sink-cluster/resume -X PUT

# Restart a specific task
curl -i http://127.0.0.1:8083/connectors/test-selectdb-sink-cluster/tasks/0/restart -X POST

# Delete a connector
curl -i http://127.0.0.1:8083/connectors/test-selectdb-sink-cluster -X DELETE

For the full REST API reference, see Kafka Connect REST interface for Confluent Platform.

Advanced configuration

Configure a dead-letter queue

By default, a record conversion error stops the connector. To keep the connector running and route failed records to a separate topic instead, add the following to your connector configuration:

errors.tolerance=all
errors.deadletterqueue.topic.name=test_error_topic
errors.deadletterqueue.context.headers.enable=true
errors.deadletterqueue.topic.replication.factor=1

For more details, see Error reporting in Connect.

Connect to a Kafka cluster with SSL

To connect to a Kafka cluster that requires SSL authentication, add the following to connect-distributed.properties. The client.truststore.jks file authenticates the public key of the Kafka broker.

# Connect worker
security.protocol=SSL
ssl.truststore.location=/var/ssl/private/client.truststore.jks
ssl.truststore.password=test1234

# Embedded consumer for sink connectors
consumer.security.protocol=SSL
consumer.ssl.truststore.location=/var/ssl/private/client.truststore.jks
consumer.ssl.truststore.password=test1234

For more information, see Configure Kafka Connect with SSL.

Troubleshooting

Consumer is kicked out of the consumer group

Error:

org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed
since the consumer is not part of an active group...

This happens when Stream Load takes longer than the Kafka poll interval (default: 5 minutes), causing the consumer to be evicted from the group.

Fix: Increase max.poll.interval.ms in connect-standalone.properties or connect-distributed.properties:

max.poll.interval.ms=1800000
consumer.max.poll.interval.ms=1800000

JsonConverter schema error when reading JSON data

Error:

Caused by: org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable
requires "schema" and "payload" fields and may not contain additional fields.

This happens when JsonConverter is configured with schemas.enable=true (the default) but your Kafka messages contain plain JSON without a schema wrapper.

Fix: Disable schema enforcement in your connector configuration:

value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
key.converter.schemas.enable=false

What's next