All Products
Search
Document Center

DataHub:DataHub Kafka compatibility mode

Last Updated:Jun 25, 2026

DataHub Kafka compatibility mode

DataHub is now compatible with the Apache Kafka protocol. You can use a standard Kafka SDK to connect to DataHub to publish and subscribe to data.

Concept mapping between DataHub and Kafka

Kafka

DataHub

topic

Project.topic

partition

shard

offset

sequence

Kafka topic

DataHub also has topics. However, DataHub includes an additional resource layer called a project, which does not exist in Kafka. To ensure compatibility, a Kafka topic corresponds to a combination of a DataHub project and topic, joined by a period (.). For example, if you have a project named test_project and a topic named test_topic in DataHub, the corresponding Kafka topic name is test_project.test_topic.

Kafka partition

A Kafka partition is directly equivalent to a DataHub shard. Both represent an ordered queue of data.

Kafka consumer group

A DataHub consumer group behaves similarly to a Kafka consumer group. You can create a consumer group within a project and associate it with the topics you want to subscribe to. This allows the group to subscribe to multiple topics within that project. If a consumer group is bound to a topic, a subscription is automatically created, which you can view on the subscription list page of the topic. Deleting this subscription prevents the group from subscribing to the topic and erases all previous consumer offsets.

Because a consumer group is a sub-resource of a project, you must specify it along with the project. For example, if your DataHub project is test_project and your consumer group is test_group, the Kafka group name is test_project.test_group.

In the Create consumer group dialog box, enter a Name and Description, select the topics to subscribe to in the Topic area, click the > button to add them to the selected list on the right, and then click Create.

Each group can subscribe to a maximum of 50 topics. If you need to subscribe to more, please submit a ticket.

Kafka Record

A Kafka record is in a key-value format. In contrast, DataHub records are available in two formats: Tuple and Blob. Tuple records contain strongly-structured data, and Blob records contain binary data. A Kafka record generally consists of two parts: a header and a key-value pair.

Kafka Header

A Kafka header can add additional information to data, which serves the same purpose as a DataHub attribute. If you use a Kafka client to write data that includes headers, the header information is stored as DataHub attributes. If the value of a Kafka header is NULL, the corresponding header is ignored. Do not use "__kafka_key__" as a header key because it is a reserved internal key.

Kafka key-value data

  • If a DataHub topic is a Tuple type, its key corresponds to the first String column and its value corresponds to the second String column.

  • If a DataHub topic is a Blob type, its value corresponds to the data content, and its key is placed in an attribute with the format <"__kafka_key__", key>.

Kafka offset

A Kafka offset is a 64-bit integer. For a partition, the offset starts from 0 and auto-increments. This ensures that each data record has a unique offset, which is primarily used to record the consumer offset. The sequence in DataHub has the same meaning as the offset in Kafka. Therefore, you can consider a DataHub sequence to be directly equivalent to a Kafka offset.

Limits

DataHub does not support Kafka transactions, idempotence, SchemaRegistry, or Log Compaction.

Quick Start

  1. Use the recommended Kafka client version 2.4.0. It is compatible with versions 0.10.0 to 4.0.

  2. Create the corresponding Project, Topic, and Group resources in DataHub.

  3. Change the security authentication method to SASL/SSL, use the PLAIN mechanism for SASL, and configure your Alibaba Cloud AccessKey pair (AK/SK).

  4. Change the Kafka broker information to the DataHub endpoint (see the list of service domains).

Resource creation

First, log on to the DataHub console and create a project.

In the New Project dialog box, set Name to test_kafka_project and Description to test kafka.

Create a topic

On the New Topic page of the DataHub console, select Direct Create as the creation method, enter test_kafka as the name, and select TUPLE as the type. In the Schema details section, add two fields named key and value, set the type for both to STRING, and select Allow NULL. Set Shard Count to 1 and Lifecycle to 3, enable Shard Scaling Mode, and disable Multi-version. Enter a description and click Create.

Create a group and bind the topics that you need to consume. You can also modify the list of bound topics after the group is created. If you do not need to consume messages, you can skip this step.

In the New Group panel, enter a Name and Description, use the transfer list to move the topics to consume from the available list on the left to the selected list on the right, and then click Create.

Authentication Method

Create a file named kafka_client_producer_jaas.conf and save it to any path. The content of the file is as follows.

KafkaClient {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="accessId"
  password="accessKey";
};

Create a file named kafka_client_producer_jaas.conf and save it to any directory. The file must contain the following content:

KafkaClient {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="YOUR_ACCESS_ID"
  password="YOUR_ACCESS_KEY";
};

Producer example

kafka clients pom file

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>2.4.0</version>
</dependency>

Kafka client pom.xml file:

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>2.4.0</version>
</dependency>

ProducerExample

Consumer example

ConsumerExample

package com.aliyun.datahub.kafka.demo;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
public class ConsumerExample2 {
    static {
        System.setProperty("java.security.auth.login.config", "/path/xxx/kafka_client_producer_jaas.conf");
    }
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "dh-cn-hangzhou.aliyuncs.com:9092");
        properties.put("security.protocol", "SASL_SSL");
        properties.put("sasl.mechanism", "PLAIN");
        properties.put("group.id", "test_project.test_kafka_group");
        properties.put("auto.offset.reset", "earliest");
        properties.put("session.timeout.ms", "60000");
        properties.put("heartbeat.interval.ms", "40000");
        properties.put("ssl.endpoint.identification.algorithm", "");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
        List<String> topicList = new ArrayList<>();
        topicList.add("test_project.test_topic1");
        topicList.add("test_project.test_topic2");
        topicList.add("test_project.test_topic3");
        kafkaConsumer.subscribe(topicList);
        while (true) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(5));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record.toString());
            }
        }
    }
}

ConsumerExample

package com.aliyun.datahub.kafka.demo;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
public class ConsumerExample {
    static {
        System.setProperty("java.security.auth.login.config", "/path/to/your/kafka_client_producer_jaas.conf");
    }
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "dh-cn-hangzhou.aliyuncs.com:9092");
        properties.put("security.protocol", "SASL_SSL");
        properties.put("sasl.mechanism", "PLAIN");
        properties.put("group.id", "test_project.test_kafka_group");
        properties.put("auto.offset.reset", "earliest");
        properties.put("session.timeout.ms", "60000");
        properties.put("heartbeat.interval.ms", "40000");
        properties.put("ssl.endpoint.identification.algorithm", "");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
        List<String> topicList = new ArrayList<>();
        topicList.add("test_project.test_topic1");
        topicList.add("test_project.test_topic2");
        topicList.add("test_project.test_topic3");
        kafkaConsumer.subscribe(topicList);
        while (true) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(5));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record.toString());
            }
        }
    }
}

Streams Examples

This code reads data from input in test_project, converts the key and value strings to lowercase, and rewrites the data to output.

public class StreamExample {
    static {
        System.setProperty("java.security.auth.login.config", "/path/xxx/kafka_client_producer_jaas.conf");
    }
    public static void main(final String[] args) {
        final String input = "test_project.input";
        final String output = "test_project.output";
        final Properties properties = new Properties();
        properties.put("bootstrap.servers", "dh-cn-hangzhou.aliyuncs.com:9092");
        properties.put("application.id", "test_project.test_kafka_group");
        properties.put("security.protocol", "SASL_SSL");
        properties.put("sasl.mechanism", "PLAIN");
        properties.put("session.timeout.ms", "60000");
        properties.put("heartbeat.interval.ms", "40000");
        properties.put("auto.offset.reset", "earliest");
        final StreamsBuilder builder = new StreamsBuilder();
        TestMapper testMapper = new TestMapper();
        builder.stream(input, Consumed.with(Serdes.String(), Serdes.String()))
                .map(testMapper)
                .to(output, Produced.with(Serdes.String(), Serdes.String()));
        final KafkaStreams streams = new KafkaStreams(builder.build(), properties);
        final CountDownLatch latch = new CountDownLatch(1);
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });
        try {
            streams.start();
            latch.await();
        } catch (final Throwable e) {
            System.exit(1);
        }
        System.exit(0);
    }
    static class TestMapper implements KeyValueMapper<String, String, KeyValue<String, String>> {
        @Override
        public KeyValue<String, String> apply(String s, String s2) {
            return new KeyValue<>(StringUtils.lowerCase(s), StringUtils.lowerCase(s2));
        }
    }
}

This example reads data from the input topic in the test_project project, converts the key and value strings to lowercase, and writes the result to the output topic.

public class StreamExample {
    static {
        System.setProperty("java.security.auth.login.config", "/path/to/your/kafka_client_producer_jaas.conf");
    }
    public static void main(final String[] args) {
        final String input = "test_project.input";
        final String output = "test_project.output";
        final Properties properties = new Properties();
        properties.put("bootstrap.servers", "dh-cn-hangzhou.aliyuncs.com:9092");
        properties.put("application.id", "test_project.test_kafka_group");
        properties.put("security.protocol", "SASL_SSL");
        properties.put("sasl.mechanism", "PLAIN");
        properties.put("session.timeout.ms", "60000");
        properties.put("heartbeat.interval.ms", "40000");
        properties.put("auto.offset.reset", "earliest");
        final StreamsBuilder builder = new StreamsBuilder();
        TestMapper testMapper = new TestMapper();
        builder.stream(input, Consumed.with(Serdes.String(), Serdes.String()))
                .map(testMapper)
                .to(output, Produced.with(Serdes.String(), Serdes.String()));
        final KafkaStreams streams = new KafkaStreams(builder.build(), properties);
        final CountDownLatch latch = new CountDownLatch(1);
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });
        try {
            streams.start();
            latch.await();
        } catch (final Throwable e) {
            System.exit(1);
        }
        System.exit(0);
    }
    static class TestMapper implements KeyValueMapper<String, String, KeyValue<String, String>> {
        @Override
        public KeyValue<String, String> apply(String s, String s2) {
            return new KeyValue<>(StringUtils.lowerCase(s), StringUtils.lowerCase(s2));
        }
    }
}

After you start the Streams job, it takes about one minute to assign the shards. You can then see the current number of tasks in the console. The number of tasks matches the number of shards in the input topic. In this example, the input topic has three shards.

The output sampling result shows that the data was written correctly. After being processed by the TestMapper in the Streams task, the uppercase input was converted to lowercase output: the keys are aaaa, cccc, and eeee, and the corresponding values are bbbb, dddd, and ffff.

Migrate data from self-managed Kafka to DataHub

  1. Switch the broker address. For more information, see the list of domain names below.

  2. Create resources in DataHub and modify the resource names in your code. For more information, see the resource creation section above.

  3. Switch the authentication method to SASL_SSL. The authentication mechanism is PLAIN. Set username to your Alibaba Cloud AccessKey ID (AK) and password to your AccessKey Secret (SK).

Appendix

Configuration Overview

C=Consumer, P=Producer, S=Streams

Parameter

C/P/S

Recommended value

Required

Description

bootstrap.servers

*

See the list of domain names.

Yes

security.protocol

*

SASL_SSL

Yes

To ensure secure data transmission, writing data from Kafka to DataHub uses SSL encryption by default.

sasl.mechanism

*

PLAIN

Yes

AccessKey authentication. Only PLAIN is supported.

compression.type

P

LZ4

No

Specifies whether to enable compression for data transmission. Currently, only LZ4 is supported.

enable.idempotence

P

false

No

Kafka clients version 3.0.1 and later enable idempotence by default. Because DataHub does not support idempotence, you must manually disable this feature. This configuration is not required for clients earlier than version 3.0.1.

group.id

C

project.group

Yes

partition.assignment.strategy

C

org.apache.kafka.clients.consumer.RangeAssignor

No

The default value for Kafka is RangeAssignor. DataHub currently supports only RangeAssignor. Do not change this configuration.

session.timeout.ms

C/S

[60000, 180000]

No

The default value in Kafka is 10000. However, DataHub enforces a minimum of 60000, so the value defaults to 60000.

heartbeat.interval.ms

C/S

Two-thirds of the session.timeout.ms value.

No

The default value in Kafka is 3000. Because session.timeout.ms defaults to 60000, explicitly set this value to 40000 to prevent frequent heartbeat requests.

application.id

S

project.topic:subId or project.group

Yes

If you use project.topic:subId, it must match the subscribed topic. Otherwise, data cannot be read. We recommend using project.group.

Service Domain Name List

Region name

Region

Public endpoint

VPC ECS endpoint

China East 1 (Hangzhou)

cn-hangzhou

dh-cn-hangzhou.aliyuncs.com:9092

dh-cn-hangzhou-int-vpc.aliyuncs.com:9094

China East 2 (Shanghai)

cn-shanghai

dh-cn-shanghai.aliyuncs.com:9092

dh-cn-shanghai-int-vpc.aliyuncs.com:9094

China North 2 (Beijing)

cn-beijing

dh-cn-beijing.aliyuncs.com:9092

dh-cn-beijing-int-vpc.aliyuncs.com:9094

China (Ulanqab)

cn-wulanchabu

dh-cn-wulanchabu.aliyuncs.com:9092

dh-cn-wulanchabu-int-vpc.aliyuncs.com:9094

China South 1 (Shenzhen)

cn-shenzhen

dh-cn-shenzhen.aliyuncs.com:9092

dh-cn-shenzhen-int-vpc.aliyuncs.com:9094

China North 3 (Zhangjiakou)

cn-zhangjiakou

dh-cn-zhangjiakou.aliyuncs.com:9092

dh-cn-zhangjiakou-int-vpc.aliyuncs.com:9094

Asia Pacific SE 1 (Singapore)

ap-southeast-1

dh-ap-southeast-1.aliyuncs.com:9092

dh-ap-southeast-1-int-vpc.aliyuncs.com:9094

Asia Pacific SE 3 (Kuala Lumpur)

ap-southeast-3

dh-ap-southeast-3.aliyuncs.com:9092

dh-ap-southeast-3-int-vpc.aliyuncs.com:9094

Europe Central 1 (Frankfurt)

eu-central-1

dh-eu-central-1.aliyuncs.com:9092

dh-eu-central-1-int-vpc.aliyuncs.com:9094

China East 2 (Shanghai) Finance

cn-shanghai-finance-1

dh-cn-shanghai-finance-1.aliyuncs.com:9092

dh-cn-shanghai-finance-1-int-vpc.aliyuncs.com:9094

China (Hong Kong)

cn-hongkong

dh-cn-hongkong.aliyuncs.com:9092

dh-cn-hongkong-int-vpc.aliyuncs.com:9094

Kafka API compatibility

The official Kafka documentation lists all available APIs. To help you understand KOD's compatibility, we also provide a list of compatible APIs.

API

Description

Produce

Writes data.

Fetch

Reads data.

ListOffsets

Gets offsets based on time. Earlier versions return a list of offsets, while later versions return a single offset.

Metadata

Gets metadata for read and write operations.

OffsetCommit

Commits an offset.

OffsetFetch

Gets an offset.

FindCoordinator

Finds the broker that hosts the coordinator and returns its virtual IP (VIP) address.

JoinGroup

Joins a group.

Heartbeat

Sends a heartbeat.

LeaveGroup

Leaves a group.

SyncGroup

Used by the group leader to send the partition assignment plan, and by all members to receive it.

SaslHandshake

Handles the Simple Authentication and Security Layer (SASL) handshake for authentication.

ApiVersions

Gets all available APIs.

CreateTopics

Creates a topic.

DeleteTopics

Deletes a topic.

OffsetForLeaderEpoch

Gets the latest offset.

SaslAuthenticate

Authenticates the client using SASL.

CreatePartitions

Adds a partition.

DeleteGroups

Deletes a group.

OffsetDelete

Deletes an offset.