Doris Kafka Connector lets ApsaraDB for SelectDB automatically subscribe to and consume data from Kafka topics. Once configured, it continuously reads records from the topics you specify and writes them to your SelectDB instance — no polling or manual triggering required.
Common ingestion patterns:
JSON data: Applications call API operations to write JSON records directly to Kafka topics. The connector consumes these records and writes them to SelectDB in near-real time.
Database change data capture (CDC): Debezium Connector captures row-level changes from MySQL, PostgreSQL, SQL Server, Oracle, or MongoDB and publishes them to Kafka topics in a unified format. The connector then writes those changes — including inserts, updates, and deletes — to SelectDB.
Compatibility
| Connector version | Kafka version | SelectDB version | Java version |
|---|---|---|---|
| 1.0.0 | 2.4 or later | All supported | 8 or later |
Prerequisites
Before you begin, make sure you have:
An Apache Kafka cluster (version 2.4 or later) or a Confluent Cloud cluster
An ApsaraDB for SelectDB instance. See Create an instance
A database and table in the instance to receive data. See Connect to an instance
The
doris-kafka-connector-1.0.0.jarfile downloaded from Maven Central
Set up the connector
Kafka Connect supports two deployment modes: standalone and distributed.
Standalone mode is for development and testing only. Use distributed mode in production.
Standalone mode
Step 1: Install the connector plugin
Place the JAR file in the $KAFKA_HOME/libs directory:
cp doris-kafka-connector-1.0.0.jar $KAFKA_HOME/libs/Step 2: Configure Kafka Connect
Edit $KAFKA_HOME/config/connect-standalone.properties:
# Broker address
bootstrap.servers=127.0.0.1:9092
# Path to the connector plugin directory
plugin.path=$KAFKA_HOME/libs
# Increase poll interval to prevent consumer group eviction during Stream Load writes.
# Stream Load can take longer than the default 5-minute limit, causing the consumer to be
# kicked out of the group. Set this to at least 30 minutes (1,800,000 ms).
max.poll.interval.ms=1800000
consumer.max.poll.interval.ms=1800000Step 3: Create a connector configuration file
Create $KAFKA_HOME/config/connect-selectdb-sink.properties:
name=test-selectdb-sink
connector.class=org.apache.doris.kafka.connector.DorisSinkConnector
topics=topic_test
doris.topic2table.map=topic_test:test_kafka_tbl
buffer.count.records=10000
buffer.flush.time=120
buffer.size.bytes=5000000
doris.urls=selectdb-cn-4xl3jv1****-public.selectdbfe.rds.aliyuncs.com
doris.http.port=8080
doris.query.port=9030
doris.user=admin
doris.password=****
doris.database=test_db
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverterStep 4: Start Kafka Connect
$KAFKA_HOME/bin/connect-standalone.sh -daemon \
$KAFKA_HOME/config/connect-standalone.properties \
$KAFKA_HOME/config/connect-selectdb-sink.propertiesDistributed mode
Distributed mode runs Kafka Connect as a cluster and manages connector configurations via REST API, making it suitable for production deployments.
Step 1: Install the connector plugin
Copy the JAR file to the $KAFKA_HOME/libs directory on each Kafka Connect worker node:
cp doris-kafka-connector-1.0.0.jar $KAFKA_HOME/libs/Step 2: Configure Kafka Connect
Edit $KAFKA_HOME/config/connect-distributed.properties on each worker:
# Broker address
bootstrap.servers=127.0.0.1:9092
# Cluster identifier — must be the same on all workers in the cluster
group.id=connect-cluster
# Path to the connector plugin directory
plugin.path=$KAFKA_HOME/libs
# Increase poll interval to prevent consumer group eviction during Stream Load writes
max.poll.interval.ms=1800000
consumer.max.poll.interval.ms=1800000Step 3: Start Kafka Connect
$KAFKA_HOME/bin/connect-distributed.sh -daemon \
$KAFKA_HOME/config/connect-distributed.propertiesStep 4: Create a connector via the REST API
curl -i http://127.0.0.1:8083/connectors \
-H "Content-Type: application/json" \
-X POST \
-d '{
"name": "test-selectdb-sink-cluster",
"config": {
"connector.class": "org.apache.doris.kafka.connector.DorisSinkConnector",
"topics": "topic_test",
"doris.topic2table.map": "topic_test:test_kafka_tbl",
"buffer.count.records": "10000",
"buffer.flush.time": "120",
"buffer.size.bytes": "5000000",
"doris.urls": "selectdb-cn-4xl3jv1****-public.selectdbfe.rds.aliyuncs.com",
"doris.http.port": "8080",
"doris.query.port": "9030",
"doris.user": "admin",
"doris.password": "***",
"doris.database": "test_db",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter"
}
}'Parameters
Required parameters
| Parameter | Description |
|---|---|
name | Connector name. Must be unique in the Kafka Connect cluster. Cannot contain ISO control characters. |
connector.class | Set to org.apache.doris.kafka.connector.DorisSinkConnector. |
topics | Kafka topics to consume from. Separate multiple topics with commas. |
doris.urls | Endpoint of your ApsaraDB for SelectDB instance. Find it on the Instance Details page under Network Information. Example: selectdb-cn-4xl3jv1****-public.selectdbfe.rds.aliyuncs.com |
doris.user | Username for connecting to the instance. |
doris.password | Password for the username. |
doris.database | Target database in the instance. |
Buffer parameters
| Parameter | Default | Description |
|---|---|---|
buffer.count.records | 10000 | Number of records buffered per Kafka partition before flushing to SelectDB. |
buffer.flush.time | 120 | Buffer flush interval, in seconds. |
buffer.size.bytes | 5000000 | Maximum buffer size per Kafka partition, in bytes. |
Port parameters
| Parameter | Default | Description |
|---|---|---|
doris.http.port | 8080 | HTTP port of the SelectDB instance. Used for Stream Load writes. |
doris.query.port | 9030 | MySQL-compatible query port of the SelectDB instance. |
Converter parameters
| Parameter | Required | Description |
|---|---|---|
key.converter | Yes | Converter class for message keys. Use org.apache.kafka.connect.storage.StringConverter for string keys or org.apache.kafka.connect.json.JsonConverter for JSON keys. |
value.converter | Yes | Converter class for message values. Use org.apache.kafka.connect.json.JsonConverter for JSON-formatted values. |
Advanced parameters
| Parameter | Default | Valid values | Description |
|---|---|---|---|
doris.topic2table.map | (same as topic name) | topic1:table1,topic2:table2 | Maps Kafka topics to SelectDB tables. If left blank, the topic name is used as the table name. |
load.model | stream_load | stream_load, copy_into | Load method. stream_load writes directly to SelectDB. copy_into stages data in OSS first, then loads it — required for exactly_once delivery. |
delivery.guarantee | at_least_once | at_least_once, exactly_once | Data consistency guarantee. Set to exactly_once only when load.model=copy_into. |
enable.2pc | — | true, false | Enables two-phase commit for exactly-once semantics. |
enable.delete | false | true, false | Deletes rows in SelectDB when the source record is deleted in Kafka. |
label.prefix | (connector name) | Any string | Label prefix for Stream Load imports. |
auto.redirect | — | true, false | Redirects Stream Load requests directly to the backend (BE) node handling the write, bypassing the frontend (FE). |
sink.properties.* | — | — | Stream Load parameters. For example, sink.properties.column_separator sets the column delimiter. See Import data by using Stream Load. |
jmx | true | true, false | Enables Java Management Extensions (JMX) monitoring. See Use JMX to monitor Doris Kafka Connector. |
For a full list of Kafka Connect Sink configuration options, see Configuring connectors in the Kafka 3.7 documentation.
Examples
Example 1: Import JSON data
This example sets up a standalone connector that reads JSON records from a Kafka topic and writes them to a SelectDB table.
Prepare the environment
Start a Kafka cluster (version 2.4.0 or later). This example uses a standalone Kafka cluster:
wget https://archive.apache.org/dist/kafka/2.4.0/kafka_2.12-2.4.0.tgz tar -zxvf kafka_2.12-2.4.0.tgz cd kafka_2.12-2.4.0/ bin/zookeeper-server-start.sh -daemon config/zookeeper.properties bin/kafka-server-start.sh -daemon config/server.propertiesInstall the connector plugin:
cp doris-kafka-connector-1.0.0.jar $KAFKA_HOME/libs/Create a test database and table in your SelectDB instance:
CREATE DATABASE test_db; USE test_db; CREATE TABLE employees ( emp_no INT NOT NULL, birth_date DATE, first_name VARCHAR(20), last_name VARCHAR(20), gender CHAR(2), hire_date DATE ) UNIQUE KEY(`emp_no`) DISTRIBUTED BY HASH(`emp_no`) BUCKETS 1;
Configure and start the connector
Create $KAFKA_HOME/config/selectdb-sink.properties:
name=selectdb_sink
connector.class=org.apache.doris.kafka.connector.DorisSinkConnector
topics=test_topic
doris.topic2table.map=test_topic:employees
buffer.count.records=10000
buffer.flush.time=120
buffer.size.bytes=5000000
doris.urls=selectdb-cn-4xl3jv1****-public.selectdbfe.rds.aliyuncs.com
doris.http.port=8080
doris.query.port=9030
doris.user=admin
doris.password=***
doris.database=test_db
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Optional: route failed records to a dead-letter queue instead of stopping the connector
errors.tolerance=all
errors.deadletterqueue.topic.name=test_error
errors.deadletterqueue.context.headers.enable=true
errors.deadletterqueue.topic.replication.factor=1Start Kafka Connect:
bin/connect-standalone.sh -daemon \
config/connect-standalone.properties \
config/selectdb-sink.propertiesVerify the import
Produce a few test records to the topic:
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test_topic
> {"emp_no":1,"birth_date":"1990-01-01","first_name":"Alice","last_name":"Smith","gender":"F","hire_date":"2020-06-15"}
> {"emp_no":2,"birth_date":"1985-03-22","first_name":"Bob","last_name":"Jones","gender":"M","hire_date":"2019-11-01"}After the buffer flushes (within buffer.flush.time seconds), query the table in SelectDB:
SELECT * FROM test_db.employees;Expected output:
+--------+------------+------------+-----------+--------+------------+
| emp_no | birth_date | first_name | last_name | gender | hire_date |
+--------+------------+------------+-----------+--------+------------+
| 1 | 1990-01-01 | Alice | Smith | F | 2020-06-15 |
| 2 | 1985-03-22 | Bob | Jones | M | 2019-11-01 |
+--------+------------+------------+-----------+--------+------------+Example 2: Sync MySQL data using Debezium Connector
Use Debezium Connector to capture changes from a MySQL database and stream them to SelectDB in real time. Debezium publishes each row-level change — insert, update, or delete — to a Kafka topic. The SelectDB connector then consumes those events and applies them to the target table.
Create the target database and table in your SelectDB instance before starting the sync.
Step 1: Download and install Debezium Connector
wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.9.8.Final/debezium-connector-mysql-1.9.8.Final-plugin.tar.gz
tar -zxvf debezium-connector-mysql-1.9.8.Final-plugin.tar.gz
# Copy all extracted JARs to the libs directory
cp debezium-connector-mysql/*.jar $KAFKA_HOME/libs/Step 2: Configure the MySQL source connector
Create $KAFKA_HOME/config/mysql-source.properties:
name=mysql-source
connector.class=io.debezium.connector.mysql.MySqlConnector
database.hostname=rm-bp17372257wkz****.rwlb.rds.aliyuncs.com
database.port=3306
database.user=testuser
database.password=****
database.server.id=1
# Unique identifier for this connector in Kafka
database.server.name=test123
# Databases and tables to sync (default: all)
database.include.list=test
table.include.list=test.test_table
database.history.kafka.bootstrap.servers=localhost:9092
# Kafka topic for schema change history
database.history.kafka.topic=dbhistory
transforms=unwrap
# Flatten CDC events to simple key-value records
# See https://debezium.io/documentation/reference/stable/transformations/event-flattening.html
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
# Include deleted row state for DELETE events
transforms.unwrap.delete.handling.mode=rewriteAfter starting, Debezium names topics using the format SERVER_NAME.DATABASE_NAME.TABLE_NAME — in this example, test123.test.test_table.
For the full Debezium MySQL connector configuration reference, see Debezium connector for MySQL.
Step 3: Configure the SelectDB sink connector
Create $KAFKA_HOME/config/selectdb-sink.properties:
name=selectdb-sink
connector.class=org.apache.doris.kafka.connector.DorisSinkConnector
topics=test123.test.test_table
doris.topic2table.map=test123.test.test_table:test_table
buffer.count.records=10000
buffer.flush.time=120
buffer.size.bytes=5000000
doris.urls=selectdb-cn-4xl3jv1****-public.selectdbfe.rds.aliyuncs.com
doris.http.port=8080
doris.query.port=9030
doris.user=admin
doris.password=****
doris.database=test
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Optional: dead-letter queue for failed records
#errors.tolerance=all
#errors.deadletterqueue.topic.name=test_error
#errors.deadletterqueue.context.headers.enable=true
#errors.deadletterqueue.topic.replication.factor=1Step 4: Start Kafka Connect
bin/connect-standalone.sh -daemon \
config/connect-standalone.properties \
config/mysql-source.properties \
config/selectdb-sink.propertiesCheck logs/connect.log to confirm Kafka Connect started successfully.
Manage connectors
In distributed mode, use the Kafka Connect REST API to manage connector lifecycle:
# Check connector status
curl -i http://127.0.0.1:8083/connectors/test-selectdb-sink-cluster/status -X GET
# Pause a connector
curl -i http://127.0.0.1:8083/connectors/test-selectdb-sink-cluster/pause -X PUT
# Resume a paused connector
curl -i http://127.0.0.1:8083/connectors/test-selectdb-sink-cluster/resume -X PUT
# Restart a specific task
curl -i http://127.0.0.1:8083/connectors/test-selectdb-sink-cluster/tasks/0/restart -X POST
# Delete a connector
curl -i http://127.0.0.1:8083/connectors/test-selectdb-sink-cluster -X DELETEFor the full REST API reference, see Kafka Connect REST interface for Confluent Platform.
Advanced configuration
Configure a dead-letter queue
By default, a record conversion error stops the connector. To keep the connector running and route failed records to a separate topic instead, add the following to your connector configuration:
errors.tolerance=all
errors.deadletterqueue.topic.name=test_error_topic
errors.deadletterqueue.context.headers.enable=true
errors.deadletterqueue.topic.replication.factor=1For more details, see Error reporting in Connect.
Connect to a Kafka cluster with SSL
To connect to a Kafka cluster that requires SSL authentication, add the following to connect-distributed.properties. The client.truststore.jks file authenticates the public key of the Kafka broker.
# Connect worker
security.protocol=SSL
ssl.truststore.location=/var/ssl/private/client.truststore.jks
ssl.truststore.password=test1234
# Embedded consumer for sink connectors
consumer.security.protocol=SSL
consumer.ssl.truststore.location=/var/ssl/private/client.truststore.jks
consumer.ssl.truststore.password=test1234For more information, see Configure Kafka Connect with SSL.
Troubleshooting
Consumer is kicked out of the consumer group
Error:
org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed
since the consumer is not part of an active group...This happens when Stream Load takes longer than the Kafka poll interval (default: 5 minutes), causing the consumer to be evicted from the group.
Fix: Increase max.poll.interval.ms in connect-standalone.properties or connect-distributed.properties:
max.poll.interval.ms=1800000
consumer.max.poll.interval.ms=1800000JsonConverter schema error when reading JSON data
Error:
Caused by: org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable
requires "schema" and "payload" fields and may not contain additional fields.This happens when JsonConverter is configured with schemas.enable=true (the default) but your Kafka messages contain plain JSON without a schema wrapper.
Fix: Disable schema enforcement in your connector configuration:
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
key.converter.schemas.enable=falseWhat's next
Import data by using Stream Load — learn about Stream Load parameters you can set via
sink.properties.*Configuring connectors — full list of Kafka Connect Sink configuration options
Debezium connector for MySQL — full MySQL CDC configuration reference