Kafka連接器支援讀取Protobuf格式的資料。
Protobuf格式
Protobuf(Protocol Buffers)是由Google開發的一種高效、跨語言、結構化的資料序列化格式。相比於 JSON 和 XML,它具有以下顯著優勢:
體積小:序列化後的資料更緊湊,節省儲存與網路傳輸資源。
速度快:序列化與還原序列化效率高,適合高效能情境。
結構化定義:通過
.proto檔案定義資料結構,介面清晰,易於維護。跨語言支援強 :支援主流程式設計語言,便於多系統間資料互動。
因此,Protobuf 被廣泛應用於高頻通訊、微服務、Realtime Compute等情境,是 Kafka 中推薦使用的高效資料格式之一。
使用限制
僅支援Protocol Buffers 21.7及以下版本。
步驟一:編譯Protobuf檔案
建立名為order.proto的Protobuf檔案。
proto3
syntax = "proto3"; // proto 的邏輯package名稱 (用於在其他 .proto 檔案中 import 引用此檔案) package com.aliyun; // java package名稱,如果不指定會預設直接使用上面 proto 的 package option java_package = "com.aliyun"; // 是否編譯成多個檔案 (建議為 true,這樣每個 Message 會產生獨立的 .java 檔案,而不是作為內部類) option java_multiple_files = true; // java 的輸出類名 (當 java_multiple_files = true 時,這個類主要用於包含檔案名稱等中繼資料,不再包裹 Message 類) option java_outer_classname = "OrderProtoBuf"; message Order { // Proto3 移除 optional/required 關鍵字。 // 注意:基礎資料類型(int, long, double) 預設為 0,string 預設為空白字串。 // Java 中將不再產生 hasOrderId() 方法,無法區分"未賦值"和"賦值為0"。 int32 orderId = 1; string orderName = 2; double orderPrice = 3; int64 orderDate = 4; }proto2
syntax = "proto2"; // proto 的package名稱package com.aliyun; // java package名稱,如果不指定會預設用proto的packageoption java_package = "com.aliyun"; // 是否編譯成多個檔案option java_multiple_files = true; java的封裝類的類名option java_outer_classname = "OrderProtoBuf"; message Order { optional int32 orderId = 1; optional string orderName= 2; optional double orderPrice = 3; optional int64 orderDate = 4; }使用Protocol Buffers工具產生原始碼。
建立一個Maven空專案,將Protobuf檔案放到src/main/proto目錄下。
目錄樣本
KafkaProtobuf ‒ src -manin -java -proto -order.proto ‒ pom.xmlpom.xml
說明需要和Flink中依賴的版本對齊(如3.21.7),否則可能產生的類和protobuf-java:3.21.7衝突。
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.aliyun</groupId> <artifactId>KafkaProtobuf</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>3.21.7</version> <!-- 版本號碼需與產生代碼時使用的 Protobuf 版本一致 --> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.3.1</version> <!-- 請根據你的 Kafka 版本進行調整 --> </dependency> <dependency> <groupId>com.github.javafaker</groupId> <artifactId>javafaker</artifactId> <version>1.0.2</version> </dependency> </dependencies> <build> <finalName>KafkaProtobuf</finalName> <plugins> <!-- Java 編譯器--> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.13.0</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <!-- 我們使用maven-shade 建立一個包含所有必須依賴的 fat jar --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.5.3</version> </plugin> </plugins> </build> </project>在java目錄下會產生三個類,分別是Order具體類,OrderOrBuilder介面類和OrderProtobuf外部封裝類。
# 終端進入專案根目錄下(pom.xml所在目錄)使用命令產生原始碼 protoc --java_out=src/main/java --proto_path=src/main/proto src/main/proto/order.proto序列化和還原序列化測試。
package com.aliyun; public class OrderTest { public static void main(String[] args) { // 建立一個Order對象並設定欄位值 Order order = Order.newBuilder() .setOrderId(8513) .setOrderName("flink") .setOrderPrice(99.99) .setOrderDate(System.currentTimeMillis()) .build(); // 序列化為位元組數組 byte[] serializedBytes = order.toByteArray(); System.out.println("序列化後位元組長度:"+ serializedBytes.length); // 還原序列化為新的Order對象 Order deserializedOrder; try { deserializedOrder = Order.parseFrom(serializedBytes); } catch (Exception e) { System.err.println("還原序列化失敗: " + e.getMessage()); return; } System.out.println("原始對象: \n" + order); // 驗證還原序列化後的對象與原始對象的欄位值是否一致 if (order.getOrderId() == deserializedOrder.getOrderId() && order.getOrderName().equals(deserializedOrder.getOrderName()) && order.getOrderPrice() == deserializedOrder.getOrderPrice() && order.getOrderDate() == deserializedOrder.getOrderDate()) { System.out.println("序列化和還原序列化測試通過!"); } else { System.out.println("序列化和還原序列化測試失敗!"); } } }
步驟二:構建測試資料,寫入Kafka
本樣本以雲訊息佇列Kafka為作業環境。
下載SSL根憑證。如果是SSL存取點,需下載該認證。
username和password為執行個體的使用者名稱和密碼。
如果執行個體未開啟ACL,您可以在雲訊息佇列 Kafka 版控制台的執行個體詳情頁面配置資訊地區擷取預設使用者的使用者名稱和密碼。
如果執行個體已開啟ACL,請確保要使用的SASL使用者為PLAIN類型且已授權收發訊息的許可權,詳情請參見使用ACL功能進行存取控制。
package com.aliyun;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Future;
import com.github.javafaker.Faker; // 引入Faker庫產生隨機測試資料集
public class ProtoBufToKafkaTest {
public static void main(String[] args) {
Properties props = new Properties();
// 設定存取點,請通過Kafka控制台擷取對應Topic的存取點
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "<bootstrap_servers>");
// 接入協議,使用SASL_SSL協議接入。
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
// 設定SSL根憑證的路徑(絕對路徑),該檔案不能被打包到Jar當中
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "../only.4096.client.truststore.jks");
// 根憑證store的密碼,保持不變。
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
// SASL鑒權方式,保持不變。
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
// 雲訊息佇列 Kafka 版訊息的序列化方式。key/value的序列化方式
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
// 請求的最長等待時間。單位毫秒。
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30 * 1000);
// 設定用戶端內部重試次數。
props.put(ProducerConfig.RETRIES_CONFIG, 5);
// 設定用戶端內部稍候再試。單位毫秒。
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 3000);
// Hostname校正改成空。
props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"aliyun_flink\" password=\"123456\";");
// 構造Producer對象,注意,該對象是安全執行緒的,一般來說,一個進程內一個Producer對象即可。
// 如果想提高效能,可以多構造幾個對象,但不要太多,最好不要超過5個。
KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props);
String topic = "test";
// 新增:建立一個列表用於儲存三條訊息
List<ProducerRecord<String, byte[]>> messages = new ArrayList<>();
for (int i = 0; i < 3; i++) {
byte[] value = getProtoTestData();
ProducerRecord<String, byte[]> kafkaMessage = new ProducerRecord<>(topic, value);
messages.add(kafkaMessage);
}
try {
// 批量發送訊息
List<Future<RecordMetadata>> futures = new ArrayList<>();
for (ProducerRecord<String, byte[]> message : messages) {
Future<RecordMetadata> metadataFuture = producer.send(message);
futures.add(metadataFuture);
}
producer.flush();
// 同步擷取Future對象的結果
for (Future<RecordMetadata> future : futures) {
try {
RecordMetadata recordMetadata = future.get();
System.out.println("Produce ok:" + recordMetadata.toString());
} catch (Throwable t) {
t.printStackTrace();
}
}
} catch (Exception e) {
// 用戶端內部重試之後,仍然發送失敗,業務要應對此類錯誤。
System.out.println("error occurred");
e.printStackTrace();
}
}
private static byte[] getProtoTestData() {
// 使用Faker產生隨機資料
Faker faker = new Faker();
int orderId = faker.number().numberBetween(1000, 9999); // 隨機產生訂單ID
String orderName = faker.commerce().productName(); // 隨機產生訂單名稱
double orderPrice = faker.number().randomDouble(2, 10, 1000); // 隨機產生訂單價格
long orderDate = System.currentTimeMillis(); // 目前時間作為訂單日期
// 按照定義的資料結構,建立一個對象。
Order order = Order.newBuilder()
.setOrderId(orderId)
.setOrderName(orderName)
.setOrderPrice(orderPrice)
.setOrderDate(orderDate)
.build();
// 發送資料序列化:將對象資料轉化為位元組資料輸出
return order.toByteArray();
}
}運行測試代碼,向Kafka的test Topic寫入三條Protobuf格式的資料。

步驟三:編譯打包上傳
上傳編譯打包好的KafkaProtobuf.jar。

僅VVR 8.0.9及以上版本使用內建的Protobuf資料格式。如果低於此版本,需要額外添加flink-protobuf-1.17.2.jar依賴檔案。
步驟四:Flink SQL讀取資料
SQL樣本參考。
添加
protobuf.message-class-name參數指定訊息體對應的message類,更多protobuf參數詳情請參見Flink-Protobuf。CREATE TEMPORARY TABLE KafkaSource ( orderId INT, orderName STRING, orderPrice DOUBLE, orderDate BIGINT ) WITH ( 'connector' = 'kafka', 'topic' = 'test', 'properties.group.id' = 'my-group', -- 消費者組ID 'properties.bootstrap.servers' = '<bootstrap_servers>', --填入對應的Kafka broker地址。 'format' = 'protobuf', -- value部分時使用的資料格式 'protobuf.message-class-name' = 'com.aliyun.Order', -- 指定訊息體對應的message類 'scan.startup.mode' = 'earliest-offset' -- 從Kafka最早分區開始讀取。 ); CREATE TEMPORARY TABLE KafkaSink ( orderId INT, orderName STRING, orderPrice DOUBLE, orderDate BIGINT ) WITH ( 'connector' = 'print' ); INSERT INTO KafkaSink SELECT * FROM KafkaSource ;附加檔案依賴引用。

SQL調試。

作業部署運行後查看輸出。

常見問題
上遊讀取後,下遊寫入Kafka其他Topic,日誌資訊中出現大量warning報
CORRUPT_MESSAGE。問題原因:阿里雲訊息佇列Kafka(非專業高寫版建立的Local儲存的Topic)不支援等冪和事務寫入,您將無法使用Kafka結果表提供的精確一次語義exactly-once semantic功能。
解決方案:需要在結果表中添加配置項
properties.enable.idempotence=false以關閉等冪寫入功能。作業運行時日誌報錯:
NoClassDefFoundError。問題原因:上傳的protobuf-java的Jar包的版本和Protocol Buffers編譯的不一致。
解決方案:檢查附加依賴檔案的版本是否一致,是否有檔案缺少,編譯打包的完整性。
作業檢查報錯:
Could not find any factory for identifier 'protobuf' that implements one of 'org.apache.flink.table.factories.EncodingFormatFactory。問題原因:僅支援VVR 8.0.9及以上版本使用內建的Protobuf資料格式。
解決方案:檢查是否有添加
flink-protobuf依賴。