All Products
Search
Document Center

Compatibility with Kafka

Last Updated: Aug 25, 2021

DataHub is fully compatible with the Kafka protocol. You can use the native Kafka client to read data from and write data to DataHub.

DataHub is fully compatible with the Kafka protocol. You can use the native Kafka client to read data from and write data to DataHub.

Background information

Mapping from Kafka to DataHub

Topic type

The topic expansion mode in Kafka is different from that in DataHub. To adapt to the topic expansion mode in Kafka, when you create a topic in DataHub, set the ExpandMode parameter to ONLY_EXTEND. A topic whose ExpandMode parameter is set to ONLY_EXTEND does not support separate or merge operations. You can add but cannot remove shards.

Topic naming

A topic name in Kafka maps a project name and a topic name in DataHub, which are separated by a period (.).For example, a topic named test_project.test_topic in Kafka maps a topic named test_topic in a project named test_project. If a topic name in Kafka contains multiple periods (.),the part before the first period (.) is the project name in DataHub and the remaining part is the topic name.The other periods (.) and hyphens (-) are replaced by underscores (_).

Partition

Each shard in the ACTIVE state in DataHub corresponds to a partition in Kafka. If the number of active shards in DataHub is five, it can be considered that Kafka contains five partitions. When you write data, you can specify a partition based on the partition IDs [0,4]. If you do not specify a partition, the Kafka client determines the partition to which data is to be written.

TUPLE topic

When you write a key-value pair from Kafka to a TUPLE topic in DataHub, the schema of the TUPLE topic must contain one or two fields of the STRING type. Otherwise, data writes fail. If the schema contains one field, only the value of the key-value pair is written and the key is discarded. If the schema contains two fields, the value is written to one field and the key is written to the other field. If you write binary data to a TUPLE topic, the data is displayed in garbled text in the topic. We recommend that you write binary data to a BLOB topic.

BLOB topic

When you write a key-value pair from Kafka to a BLOB topic in DataHub, the value is written to the BLOB topic. If the key is not NULL, the key is written to DataHub as an attribute. The key of the attribute is __kafka_key__ and the value is the key of the Kafka data.

Header

A header in Kafka corresponds to an attribute in DataHub. However, a header whose value is NULL is ignored in Kafka. We recommend that you do not use __kafka_key__ as the key of a header.

Kafka configuration parameters

C=Consumer, P=Producer, S=Streams

Parameter

C/P/S

Valid value

Required

Description

bootstrap.servers

*

For more information, see the "Kafka endpoints" section of this topic.

Yes

security.protocol

*

SASL_SSL

Yes

To ensure secure data transmission, Secure Sockets Layer (SSL) is used for encryption when data is written from Kafka to DataHub.

sasl.mechanism

*

PLAIN

Yes

The AccessKey authentication mode. Set this parameter to PLAIN.

compression.type

P

LZ4

No

Specifies whether to enable compressed transmission. Only the LZ4 compression algorithm is supported.

group.id

C

project.topic:subId

Yes

The ID of the consumer group. Set this parameter based on the subscribed topic. Otherwise, data reads fail.

partition.assignment.strategy

C

org.apache.kafka.clients.consumer.RangeAssignor

No

The policy for partition assignment. The default policy for partition assignment in Kafka is RangeAssignor, which is also the only policy supported by DataHub. Do not modify this parameter.

session.timeout.ms

C/S

[60000,180000]

No

The timeout period of sessions. The default timeout period of sessions in Kafka is 10,000 milliseconds. However, the minimum timeout period of sessions in DataHub is 60,000 milliseconds. Therefore, the default value of this parameter is 60000.

heartbeat.interval.ms

C/S

We recommend that you set this parameter to two-thirds of the specified timeout period of sessions.

No

The heartbeat interval. The default heartbeat interval in Kafka is 3,000 milliseconds. The default value of the session.timeout.ms parameter is 60000, we recommend that you set this parameter to 40000. Otherwise, heartbeat requests will be excessively frequent.

application.id

S

project.topic:subId

Yes

The application ID. Set this parameter based on the subscribed topic. Otherwise, data reads fail.

The preceding table describes the parameters that need special attention when you write data from a Kafka client to DataHub. Client-related parameters such as retries,batch.size are not affected. Server-related parameters do not affect the behavior of the server. For example, no matter what the value of the acks parameter is, DataHub returns the value after the data write is complete.

Kafka endpoints

Region

Region ID

Public endpoint

ECS endpoint of the classic network

ECS Endpoint in the VPC

China (Hangzhou)

cn-hangzhou

dh-cn-hangzhou.aliyuncs.com:9092

dh-cn-hangzhou.aliyun-inc.com:9093

dh-cn-hangzhou-int-vpc.aliyuncs.com:9094

China (Shanghai)

cn-shanghai

dh-cn-shanghai.aliyuncs.com:9092

dh-cn-shanghai.aliyun-inc.com:9093

dh-cn-shanghai-int-vpc.aliyuncs.com:9094

China (Beijing)

cn-beijing

dh-cn-beijing.aliyuncs.com:9092

dh-cn-beijing.aliyun-inc.com:9093

dh-cn-beijing-int-vpc.aliyuncs.com:9094

China (Shenzhen)

cn-shenzhen

dh-cn-shenzhen.aliyuncs.com:9092

dh-cn-shenzhen.aliyun-inc.com:9093

dh-cn-shenzhen-int-vpc.aliyuncs.com:9094

China (Zhangjiakou)

cn-zhangjiakou

dh-cn-zhangjiakou.aliyuncs.com:9092

dh-cn-zhangjiakou.aliyun-inc.com:9093

dh-cn-zhangjiakou-int-vpc.aliyuncs.com:9094

Singapore (Singapore)

ap-southeast-1

dh-ap-southeast-1.aliyuncs.com:9092

dh-ap-southeast-1.aliyun-inc.com:9093

dh-ap-southeast-1-int-vpc.aliyuncs.com:9094

Malaysia (Kuala Lumpur)

ap-southeast-3

dh-ap-southeast-3.aliyuncs.com:9092

dh-ap-southeast-3.aliyun-inc.com:9093

dh-ap-southeast-3-int-vpc.aliyuncs.com:9094

India (Mumbai)

ap-south-1

dh-ap-south-1.aliyuncs.com:9092

dh-ap-south-1.aliyun-inc.com:9093

dh-ap-south-1-int-vpc.aliyuncs.com:9094

Germany (Frankfurt)

eu-central-1

dh-eu-central-1.aliyuncs.com:9092

dh-eu-central-1.aliyun-inc.com:9093

dh-eu-central-1-int-vpc.aliyuncs.com:9094

China East 2 Finance

cn-shanghai-finance-1

dh-cn-shanghai-finance-1.aliyuncs.com:9092

dh-cn-shanghai-finance-1.aliyun-inc.com:9093

dh-cn-shanghai-finance-1-int-vpc.aliyuncs.com:9094

Examples

Create a topic

Create a topic in the DataHub console

001

Create a topic by using code

Note: You cannot create a topic by calling an API operation of Kafka. To create a topic, you must call DataHub SDK. When you create a topic, set the ExpandMode parameter to ONLY_EXTEND.

The version of a Maven dependency must be 2.19.0 or later.

<dependency>
  <groupId>com.aliyun.datahub</groupId>
  <artifactId>aliyun-sdk-datahub</artifactId>
  <version>2.19.0-public</version>
</dependency>
public class CreateTopic {
    public static void main(String[] args) {
        DatahubClient datahubClient = DatahubClientBuilder.newBuilder()
                .setDatahubConfig(
                        new DatahubConfig("https://dh-cn-hangzhou.aliyuncs.com",
                                new AliyunAccount("accessId", "accessKey")))
                .build();

        int shardCount = 1;
        int lifeCycle = 7;

        try {
            datahubClient.createTopic("test_project", "test_topic", shardCount, lifeCycle, RecordType.BLOB, "comment", ExpandMode.ONLY_EXTEND);
        } catch (DatahubClientException e) {
            e.printStackTrace();
        }
    }
}

Sample producer:

Generate the kafka_client_producer_jaas.conf file

Create the kafka_client_producer_jaas.conf file and save it to a directory. The file contains the following content:

KafkaClient {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="accessId"
  password="accessKey";
};

Maven dependency

The version of the Kafka client must be 0.10.0.0 or later. The recommended version is 2.4.0.

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.4.0</version>
</dependency>

Sample code

public class ProducerExample {
    static {
        System.setProperty("java.security.auth.login.config", "src/main/resources/kafka_client_producer_jaas.conf");
    }

    public static void main(String[] args) {

        Properties properties = new Properties();
        properties.put("bootstrap.servers", "dh-cn-hangzhou.aliyuncs.com:9092");
        properties.put("security.protocol", "SASL_SSL");
        properties.put("sasl.mechanism", "PLAIN");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("compression.type", "lz4");

        String KafkaTopicName = "test_project.test_topic";
        Producer<String, String> producer = new KafkaProducer<String, String>(properties);

        try {
            List<Header> headers = new ArrayList<>();
            RecordHeader header1 = new RecordHeader("key1", "value1".getBytes());
            RecordHeader header2 = new RecordHeader("key2", "value2".getBytes());
            headers.add(header1);
            headers.add(header2);

            ProducerRecord<String, String> record = new ProducerRecord<>(KafkaTopicName, 0, "key", "Hello DataHub!", headers);

            // sync send
            producer.send(record).get();

        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } finally {
            producer.close();
        }
    }
}

Execution results

After the execution is complete, sample data to check whether the running of DataHub is normal.

Sample consumer

For information about how to generate the kafka_client_producer_jaas.conf file and maven dependency, see the relevant information in the "Sample producer" section of this topic.

Sample code

After you add a consumer, wait about one minute for the shard allocation to be complete. Then, the consumer can consume data.

public class ConsumerExample {

    static {
        System.setProperty("java.security.auth.login.config", "src/main/resources/kafka_client_producer_jaas.conf");
    }

    public static void main(String[] args) {

        Properties properties = new Properties();
        properties.put("bootstrap.servers", "dh-cn-hangzhou.aliyuncs.com:9092");
        properties.put("security.protocol", "SASL_SSL");
        properties.put("sasl.mechanism", "PLAIN");
        properties.put("group.id", "test_project.test_topic:1611039998153N71KM");
        properties.put("auto.offset.reset", "earliest");
        properties.put("session.timeout.ms", "60000");
        properties.put("heartbeat.interval.ms", "40000");
        properties.put("ssl.endpoint.identification.algorithm", "");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);

        kafkaConsumer.subscribe(Collections.singletonList("test_project.test_topic"));

        while (true) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(5));

            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record.toString());
            }
        }
    }
}

Execution results

After the execution is complete, you can view the read data on the consumer client.

ConsumerRecord(topic = test_project.test_topic, partition = 0, leaderEpoch = 0, offset = 0, LogAppendTime = 1611040892661, serialized key size = 3, serialized value size = 14, headers = RecordHeaders(headers = [RecordHeader(key = key1, value = [118, 97, 108, 117, 101, 49]), RecordHeader(key = key2, value = [118, 97, 108, 117, 101, 50])], isReadOnly = false), key = key, value = Hello DataHub!)

Note: All data returned for a data read request shares the same value of the LogAppendTime parameter, which is the greatest value of the timestamps of the data.

Sample Streams task

Maven dependency

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.4.0</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>2.4.0</version>
</dependency>

Sample code

The following sample code reads the input data in test_project, converts the key and value into lowercase, and then writes the converted data to the output.

public class StreamExample {

    static {
        System.setProperty("java.security.auth.login.config", "src/main/resources/kafka_client_producer_jaas.conf");
    }

    public static void main(final String[] args) {
        final String input = "test_project.input";
        final String output = "test_project.output";
        final Properties properties = new Properties();
        properties.put("bootstrap.servers", "dh-cn-hangzhou.aliyuncs.com:9092");
        properties.put("application.id", "test_project.input:1611293595417QH0WL");
        properties.put("security.protocol", "SASL_SSL");
        properties.put("sasl.mechanism", "PLAIN");
        properties.put("session.timeout.ms", "60000");
        properties.put("heartbeat.interval.ms", "40000");
        properties.put("auto.offset.reset", "earliest");

        final StreamsBuilder builder = new StreamsBuilder();
        TestMapper testMapper = new TestMapper();
        builder.stream(input, Consumed.with(Serdes.String(), Serdes.String()))
                .map(testMapper)
                .to(output, Produced.with(Serdes.String(), Serdes.String()));

        final KafkaStreams streams = new KafkaStreams(builder.build(), properties);
        final CountDownLatch latch = new CountDownLatch(1);

        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });

        try {
            streams.start();
            latch.await();
        } catch (final Throwable e) {
            System.exit(1);
        }
        System.exit(0);
    }

    static class TestMapper implements KeyValueMapper<String, String, KeyValue<String, String>> {

        @Override
        public KeyValue<String, String> apply(String s, String s2) {
            return new KeyValue<>(StringUtils.lowerCase(s), StringUtils.lowerCase(s2));
        }
    }
}

Execution results

After you start a Streams task, wait about one minute for the shard allocation to be complete. Then, you can view the number of tasks in the DataHub console. The number of tasks is consistent with the number of shards in the input topic. In this example, the input topic contains three shards.

currently assigned active tasks: [0_0, 0_1, 0_2]
    currently assigned standby tasks: []
    revoked active tasks: []
    revoked standby tasks: []

After the shard allocation is complete, you can write the following test data to the input topic: (AAAA,BBBB), (CCCC,DDDD), and (EEEE,FFFF). Then, sample the output data to check whether the data write is valid.

Note

  • Transactions and idempotence are not supported.

  • A Kafka client cannot automatically create a DataHub topic. Before you write data from Kafka to DataHub, make sure that a topic is created in DataHub.

  • Each consumer can subscribe to only one topic.

  • The timestamp of the data read by a consumer is the value of the LogAppendTime parameter, which indicates the time when the data was written to DataHub. All data returned for a data read request shares the same timestamp, which is the greatest value of the timestamps of the data. Therefore, when you read data, the obtained timestamp may be greater than the actual timestamp when the data was written to DataHub.

  • Each Streams task supports only one input topic and multiple output topics.

  • Streams tasks are stateless.

  • Supported Kafka versions are from 0.10.0 to 2.4.0.

FAQ

Q: A connection is disconnected during data write.

Selector - [Producer clientId=producer-1] Connection with dh-cn-shenzhen.aliyuncs.com/120.25.112.216 disconnected
java.io.EOFException
    at org.apache.kafka.common.network.SslTransportLayer.read(SslTransportLayer.java:573)
    ...

A: In Kafka, a meta request and a data write request do not use the same connection. When a meta request is sent for the first time, a connection is established. When a data write request is sent, a connection to the broker returned for the meta request is established. After this, all subsequent requests are sent over the second connection and the first connection becomes idle. If a connection remains idle beyond a time limit, the server automatically closes the connection. Therefore, you can ignore this error if it does not affect data writes.

Q: The Kafka client failed to be started.

Caused by: org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed
Caused by: javax.net.ssl.SSLHandshakeException: No subject alternative names matching IP address 100.67.134.161 found

A: Add the following code: properties.put("ssl.endpoint.identification.algorithm", "");.

Q: The DisconnectException error appears during data consumption on a consumer client.

[INFO][Consumer clientId=client-id, groupId=consumer-project.topic:subid] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 1: {}.
org.apache.kafka.common.errors.DisconnectException

A: The Kafka client must maintain a TCP-based persistent connection to the server. In general, the DisconnectException error is caused by network jitter. This error does not affect data consumption on the client because retry logic is configured on the client.