DataHub已經完全相容Kafka協議,您可以使用原生Kafka用戶端對DataHub進行讀寫操作。
相關介紹
Kafka映射DataHub介紹
Topic類型
Kafka的Topic擴容方式和DataHub的topic擴容方式不同,為了適配Kafka的topic擴容方式,DataHub建立topic時需要將擴容方式選為擴充模式。擴充模式的topic,不再支援分裂/合併作業,而是添加shard的方式,暫不支援減少shard。
Topic命名
Kafka的Topic映射之後為DataHub的project+topic,project和topic以 “.”分割,例如:test_project.test_topic對應到DataHub中Project為test_project,Topic為test_topic,如果含有多個“.”,會以首個“.”分割Project和Topic,多餘的“.”和”-“會被替換為“_“。
Partition
DataHub的每個處於Active狀態shard對應Kafka的1個Partition,如果當前Active狀態shard為5個,那麼就可以視為Kafka有5個Partition,寫入資料時,可以指定Partition範圍為[0,4],如果不指定,則會由kafka用戶端選擇Partition。
Tuple Topic
Kafka的資料寫入Tuple Topic時,Topic Schema必須為2列或1列,類型必須為STRING,其他情況會寫入失敗。如果為1列,則唯寫入value,key的資料將被丟棄,如果為2列,則第1列和第2列分別對應key和value。Tuple Topic寫入位元據會存在亂碼問題,位元據建議寫入Blob Topic。
Blob Topic
Kafka的資料寫入Blob Topic時,會把Kafka資料的value寫入Blob中,如果Kafka資料的key不為NULL,則會寫入DataHub的Attribute,其中key為”__kafka_key__“,value為Kafka資料的key。
Header
Kafka的Header對應DataHub的Attribute,但是如果Kafka的Header的value為NULL,則會忽略掉對應的header。建議不要使用”__kafka_key__“作為Header的key。
Consumer Group
DataHub的消費組就是訂閱id,只能同時訂閱單個topic,而kafka的group可以同時訂閱多個topic,為了更好的相容kafka的訂閱者式,DataHub又提供了group的功能,使用者可以在project下建立group並綁定想要訂閱的topic,就可以使用該group訂閱這個project下的多個topic。DataHub的group本質上就是服務端內部封裝了DataHub的多個訂閱,如果group綁定了topic,使用者可以在topic頁面的訂閱列表頁面,看到由group自動建立的訂閱,刪除該訂閱會導致group無法訂閱該topic,並且之前的消費點位都會消失。
目前單個group限制最多可以訂閱50個topic,如果需要訂閱更多,請開工單聯絡我們。
Kafka配置參數
C=Consumer, P=Producer, S=Streams
參數 | C/P/S | 可選配置 | 是否必須 | 描述 |
bootstrap.servers | * | 參考Kafka網域名稱列表 | 是 | |
security.protocol | * | SASL_SSL | 是 | 為了保證資料轉送的安全性,Kafka寫入DataHub預設使用SSL加密傳輸 |
sasl.mechanism | * | PLAIN | 是 | AK認證方式,僅支援PLAIN |
compression.type | P | LZ4 | 否 | 是否開啟壓縮傳輸,目前僅支援LZ4 |
group.id | C | project.topic:subId 或者 project.group | 是 | 使用project.topic:subId時必須和訂閱的topic保持一致,否則無法讀取資料,推薦使用project.group |
partition.assignment.strategy | C | org.apache.kafka.clients.consumer.RangeAssignor | 否 | Kafka預設為RangeAssignor,並且DataHub目前只支援RangeAssignor,請不要修改此配置 |
session.timeout.ms | C/S | [60000, 180000] | 否 | kafka預設為10000, 但是因為DataHub限制最小為60000,所以這裡預設會變為60000 |
heartbeat.interval.ms | C/S | 建議session.timeout.ms的 2/3 | 否 | Kafka預設為3000,但是因為 |
application.id | S | project.topic:subId 或者 project.group | 是 | 使用project.topic:subId時必須和訂閱的topic保持一致,否則無法讀取資料,推薦使用project.group |
以上是使用Kafka用戶端寫入DataHub需要重點關注的參數,對於等用戶端相關的參數,行為沒有變化,例如:retries,batch.size;對於服務端相關參數不會對服務端行為有改變,例如:無論acks的值為多少,DataHub預設資料完全寫入成功之後才會返回。
Kafka網域名稱列表
地區 | Region | 外網Endpoint | 傳統網路ECS Endpoint | VPC ECS Endpoint |
華東1(杭州) | cn-hangzhou | dh-cn-hangzhou.aliyuncs.com:9092 | dh-cn-hangzhou.aliyun-inc.com:9093 | dh-cn-hangzhou-int-vpc.aliyuncs.com:9094 |
華東2(上海) | cn-shanghai | dh-cn-shanghai.aliyuncs.com:9092 | dh-cn-shanghai.aliyun-inc.com:9093 | dh-cn-shanghai-int-vpc.aliyuncs.com:9094 |
華北2(北京) | cn-beijing | dh-cn-beijing.aliyuncs.com:9092 | dh-cn-beijing.aliyun-inc.com:9093 | dh-cn-beijing-int-vpc.aliyuncs.com:9094 |
華南1(深圳) | cn-shenzhen | dh-cn-shenzhen.aliyuncs.com:9092 | dh-cn-shenzhen.aliyun-inc.com:9093 | dh-cn-shenzhen-int-vpc.aliyuncs.com:9094 |
華北3(張家口) | cn-zhangjiakou | dh-cn-zhangjiakou.aliyuncs.com:9092 | dh-cn-zhangjiakou.aliyun-inc.com:9093 | dh-cn-zhangjiakou-int-vpc.aliyuncs.com:9094 |
亞太地區東南1(新加坡) | ap-southeast-1 | dh-ap-southeast-1.aliyuncs.com:9092 | dh-ap-southeast-1.aliyun-inc.com:9093 | dh-ap-southeast-1-int-vpc.aliyuncs.com:9094 |
亞太地區東南3(吉隆坡) | ap-southeast-3 | dh-ap-southeast-3.aliyuncs.com:9092 | dh-ap-southeast-3.aliyun-inc.com:9093 | dh-ap-southeast-3-int-vpc.aliyuncs.com:9094 |
亞太地區南部1(孟買) 已關停 | ap-south-1 | dh-ap-south-1.aliyuncs.com:9092 | dh-ap-south-1.aliyun-inc.com:9093 | dh-ap-south-1-int-vpc.aliyuncs.com:9094 |
歐洲中部1(法蘭克福) | eu-central-1 | dh-eu-central-1.aliyuncs.com:9092 | dh-eu-central-1.aliyun-inc.com:9093 | dh-eu-central-1-int-vpc.aliyuncs.com:9094 |
上海金融雲 | cn-shanghai-finance-1 | dh-cn-shanghai-finance-1.aliyuncs.com:9092 | dh-cn-shanghai-finance-1.aliyun-inc.com:9093 | dh-cn-shanghai-finance-1-int-vpc.aliyuncs.com:9094 |
中國香港 | cn-hongkong | dh-cn-hongkong.aliyuncs.com:9092 | dh-cn-hongkong.aliyun-inc.com:9093 | dh-cn-hongkong-int-vpc.aliyuncs.com:9094 |
樣本
建立Topic樣本
頁面建立

代碼建立
注意:目前無法通過kafka的api建立topic,只能通過datahub的sdk建立,建立時需要指定ExpandMode為ONLY_EXTEND,maven依賴版本需為2.19.0或更高版本
您還需要在工程中配置相應的Access Key和Secret Key,推薦使用環境變數的形式在設定檔中配置。
datahub.endpoint=<yourEndpoint>
datahub.accessId=<yourAccessKeyId>
datahub.accessKey=<yourAccessKeySecret>阿里雲帳號AccessKey擁有所有API的存取權限,建議您使用RAM使用者進行API訪問或日常營運。
強烈建議不要把AccessKey ID和AccessKey Secret儲存到工程代碼裡,否則可能導致AccessKey泄露,威脅您帳號下所有資源的安全。
<dependency>
<groupId>com.aliyun.datahub</groupId>
<artifactId>aliyun-sdk-datahub</artifactId>
<version>2.19.0-public</version>
</dependency>@Value("${datahub.endpoint}")
String endpoint ;
@Value("${datahub.accessId}")
String accessId;
@Value("${datahub.accessKey}")
String accessKey;
public class CreateTopic {
public static void main(String[] args) {
DatahubClient datahubClient = DatahubClientBuilder.newBuilder()
.setDatahubConfig(
new DatahubConfig(endpoint,
new AliyunAccount(accessId, accessKey)))
.build();
int shardCount = 1;
int lifeCycle = 7;
try {
datahubClient.createTopic("test_project", "test_topic", shardCount, lifeCycle, RecordType.BLOB, "comment", ExpandMode.ONLY_EXTEND);
} catch (DatahubClientException e) {
e.printStackTrace();
}
}
}建立group樣本
頁面建立

建立完成後仍舊可以修改綁定的topic列表,所以這裡可以先任意選擇。

建立完成後,可以在topic的訂閱列表頁面看到group自動建立了訂閱。

代碼建立
maven依賴版本需為2.21.6-public或更高版本
<dependency>
<groupId>com.aliyun.datahub</groupId>
<artifactId>aliyun-sdk-datahub</artifactId>
<version>2.21.6-public</version>
</dependency>@Value("${datahub.endpoint}")
String endpoint ;
@Value("${datahub.accessId}")
String accessId;
@Value("${datahub.accessKey}")
String accessKey;
public class CreateGroup {
public static void main(String[] args) {
DatahubClient datahubClient = DatahubClientBuilder.newBuilder()
.setDatahubConfig(
new DatahubConfig(endpoint,
new AliyunAccount(accessId, accessKey)))
.build();
List<String> topicList = new ArrayList<>();
topicList.add("test_project.topic1");
topicList.add("test_project.topic2");
topicList.add("test_project.topic3");
try {
// 建立kafka group
datahubClient.createKafkaGroup("test_project", "test_topic", "test comment");
// 將需要訂閱的topic綁定到group上
datahubClient.updateTopicsForKafkaGroup("test_project", "test_topic", topicList, UpdateKafkaGroupMode.ADD);
} catch (DatahubClientException e) {
e.printStackTrace();
}
}
}Producer樣本:
產生kafka_client_producer_jaas.conf檔案
建立檔案kafka_client_producer_jaas.conf,儲存到任意路徑,檔案內容如下。
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="accessId"
password="accessKey";
};maven依賴
Kafka-client版本至少大於等於0.10.0.0,推薦2.4.0
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.0</version>
</dependency>範例程式碼
public class ProducerExample {
static {
System.setProperty("java.security.auth.login.config", "src/main/resources/kafka_client_producer_jaas.conf");
}
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "dh-cn-hangzhou.aliyuncs.com:9092");
properties.put("security.protocol", "SASL_SSL");
properties.put("sasl.mechanism", "PLAIN");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("compression.type", "lz4");
String KafkaTopicName = "test_project.test_topic";
Producer<String, String> producer = new KafkaProducer<String, String>(properties);
try {
List<Header> headers = new ArrayList<>();
RecordHeader header1 = new RecordHeader("key1", "value1".getBytes());
RecordHeader header2 = new RecordHeader("key2", "value2".getBytes());
headers.add(header1);
headers.add(header2);
ProducerRecord<String, String> record = new ProducerRecord<>(KafkaTopicName, 0, "key", "Hello DataHub!", headers);
// sync send
producer.send(record).get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} finally {
producer.close();
}
}
}運行結果
運行成功之後,可以再DataHub抽樣一下,確認是否正常DataHub。

Consumer樣本
產生kafka_client_producer_jaas.conf檔案和maven依賴參考Producer樣本。
新加入的consumer需要十幾秒左右分配shard,分配完成後即可消費。
範例程式碼
使用kafka group樣本(推薦)
package com.aliyun.datahub.kafka.demo;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
public class ConsumerExample2 {
static {
System.setProperty("java.security.auth.login.config", "src/main/resources/kafka_client_producer_jaas.conf");
}
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "dh-cn-hangzhou.aliyuncs.com:9092");
properties.put("security.protocol", "SASL_SSL");
properties.put("sasl.mechanism", "PLAIN");
// group.id填project.group
properties.put("group.id", "test_project.test_kafka_group");
properties.put("auto.offset.reset", "earliest");
properties.put("session.timeout.ms", "60000");
properties.put("heartbeat.interval.ms", "40000");
properties.put("ssl.endpoint.identification.algorithm", "");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
List<String> topicList = new ArrayList<>();
topicList.add("test_project.test_topic1");
topicList.add("test_project.test_topic2");
topicList.add("test_project.test_topic3");
// 使用kafka group可以同時訂閱多個topic
kafkaConsumer.subscribe(topicList);
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(5));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.toString());
}
}
}
}使用project.topic.subid樣本
package com.aliyun.datahub.kafka.demo;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class ConsumerExample {
static {
System.setProperty("java.security.auth.login.config", "src/main/resources/kafka_client_producer_jaas.conf");
}
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "dh-cn-hangzhou.aliyuncs.com:9092");
properties.put("security.protocol", "SASL_SSL");
properties.put("sasl.mechanism", "PLAIN");
// group.id填project.topic.subId
properties.put("group.id", "test_project.test_topic:1611039998153N71KM");
properties.put("auto.offset.reset", "earliest");
properties.put("session.timeout.ms", "60000");
properties.put("heartbeat.interval.ms", "40000");
properties.put("ssl.endpoint.identification.algorithm", "");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
// 使用project.topic.subId的方式只能訂閱單個topic
kafkaConsumer.subscribe(Collections.singletonList("test_project.test_topic"));
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(5));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.toString());
}
}
}
}運行結果
運行成功之後,便可以在終端看到讀取到的資料。
ConsumerRecord(topic = test_project.test_topic, partition = 0, leaderEpoch = 0, offset = 0, LogAppendTime = 1611040892661, serialized key size = 3, serialized value size = 14, headers = RecordHeaders(headers = [RecordHeader(key = key1, value = [118, 97, 108, 117, 101, 49]), RecordHeader(key = key2, value = [118, 97, 108, 117, 101, 50])], isReadOnly = false), key = key, value = Hello DataHub!)注意:這裡同一個請求返回的資料的LogAppendTime是相同的,是該請求返回所有的資料的寫入DataHub時間的最大值
Streams樣本
maven依賴
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.4.0</version>
</dependency>程式碼範例
這裡讀取test_project下input的資料,將key和value的字串轉為小寫重新寫入output。
public class StreamExample {
static {
System.setProperty("java.security.auth.login.config", "src/main/resources/kafka_client_producer_jaas.conf");
}
public static void main(final String[] args) {
final String input = "test_project.input";
final String output = "test_project.output";
final Properties properties = new Properties();
properties.put("bootstrap.servers", "dh-cn-hangzhou.aliyuncs.com:9092");
properties.put("application.id", "test_project.input:1611293595417QH0WL");
properties.put("security.protocol", "SASL_SSL");
properties.put("sasl.mechanism", "PLAIN");
properties.put("session.timeout.ms", "60000");
properties.put("heartbeat.interval.ms", "40000");
properties.put("auto.offset.reset", "earliest");
final StreamsBuilder builder = new StreamsBuilder();
TestMapper testMapper = new TestMapper();
builder.stream(input, Consumed.with(Serdes.String(), Serdes.String()))
.map(testMapper)
.to(output, Produced.with(Serdes.String(), Serdes.String()));
final KafkaStreams streams = new KafkaStreams(builder.build(), properties);
final CountDownLatch latch = new CountDownLatch(1);
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (final Throwable e) {
System.exit(1);
}
System.exit(0);
}
static class TestMapper implements KeyValueMapper<String, String, KeyValue<String, String>> {
@Override
public KeyValue<String, String> apply(String s, String s2) {
return new KeyValue<>(StringUtils.lowerCase(s), StringUtils.lowerCase(s2));
}
}
}運行結果
啟動Streams任務之後,分配shard大概需要1分鐘左右,1分鐘之後就可以在控制台看到當前的task數量,task數量和輸入topic的shard數量保持一致,樣本輸入topic為3個shard。
currently assigned active tasks: [0_0, 0_1, 0_2]
currently assigned standby tasks: []
revoked active tasks: []
revoked standby tasks: []shard分配成功之後,可以向input中寫入一組測試資料 (AAAA,BBBB),(CCCC,DDDD),(EEEE,FFFF),之後再output抽樣一下,查看資料是否正確寫入。

注意事項
目前不支援事務、等冪
目前Kafka用戶端無法自動建立DataHub Topic,寫入之前需要保證已建立Topic
Consumer目前最多隻可以訂閱一個topic
Consumer讀取的資料時間戳記均為LogAppendTime,表示DataHub的落盤時間,單個請求返回的所有資料時間戳記相同,為所有資料時間戳記的最大值,所以如果讀取的時間戳記可能會大於實際的落盤時間
Streams輸入topic目前僅支援一個,輸出可以多個topic
Streams目前只支援無狀態的任務。
支援Kafka版本為0.10.0 -> 2.4.0
常見問題
Q: 寫入資料時串連斷開
Selector - [Producer clientId=producer-1] Connection with dh-cn-shenzhen.aliyuncs.com disconnected
java.io.EOFException
at org.apache.kafka.common.network.SslTransportLayer.read(SslTransportLayer.java:573)
...A: Kafka meta請求和寫資料請求不是一個串連,第一次meta請求會請求建立一個串連,然後寫資料時會重新和meta中的返回的broker重建立立一個串連,並且之後所有的請求都是在第二個串連上發送,因此第一個串連就會閑置,服務端會主動關閉閑置超過一定時間的串連,因此如果這個錯誤並沒有影像資料的正常寫入,直接忽略即可。
Q: 啟動kafka用戶端失敗
Caused by: org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed
Caused by: javax.net.ssl.SSLHandshakeException: No subject alternative names matching IP address 100.67.134.161 foundA: 添加配置properties.put("ssl.endpoint.identification.algorithm", "");。
Q: Consumer消費過程中出現DisconnectException
[INFO][Consumer clientId=client-id, groupId=consumer-project.topic:subid] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 1: {}.
org.apache.kafka.common.errors.DisconnectExceptionA: Kafka的用戶端需要與服務端保持TCP長串連,一般情況是因為網路抖動造成的,用戶端有重試邏輯,因此不會對用戶端的消費造成影響。