The Kafka connector supports reading data in Protobuf format.
Protobuf format
Protobuf (Protocol Buffers) is a high-efficiency, cross-language, structured data serialization format developed by Google. Compared to JSON and XML, it offers the following significant advantages:
Small size: Serialized data is more compact, saving storage and network transmission resources.
Fast speed: High efficiency in serialization and deserialization, suitable for high-performance scenarios.
Structured definition: Data structures are defined through
.proto
files, with clear interfaces that are easy to maintain.Strong cross-language support: Supports mainstream programming languages, facilitating data interaction between multiple systems.
Therefore, Protobuf is widely used in high-frequency communication, microservices, real-time computing, and other scenarios, making it one of the recommended efficient data formats in Kafka.
Limits
Only supports source code generated from Protobuf files defined using proto2 syntax.
Only supports Protocol Buffers 21.7 and lower versions.
Step 1: Compile Protobuf files
Create a Protobuf file named order.proto.
syntax = "proto2"; // package name of proto package com.aliyun; // java package name, if not specified, proto's package will be used by default option java_package = "com.aliyun"; // whether to compile into multiple files option java_multiple_files = true; // class name of java wrapper class option java_outer_classname = "OrderProtoBuf"; message Order { optional int32 orderId = 1; optional string orderName= 2; optional double orderPrice = 3; optional int64 orderDate = 4; }
Use the Protocol Buffers tool to generate source code.
Create an empty Maven project and place the Protobuf file in the resources directory.
Example directories
KafkaProtobuf ‒ src -java -resources -order.proto ‒ pom.xml
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.aliyun</groupId> <artifactId>KafkaProtobuf</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>3.21.7</version> <!-- The version must be consistent with the Protobuf version used to generate the code --> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.3.1</version> <!-- Please adjust according to your Kafka version --> </dependency> <dependency> <groupId>com.github.javafaker</groupId> <artifactId>javafaker</artifactId> <version>1.0.2</version> </dependency> </dependencies> <build> <finalName>KafkaProtobuf</finalName> <plugins> <!-- Java compiler --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.13.0</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <!-- Use maven-shade-plugin to create a fat JAR file that contains all the necessary dependencies. --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.5.3</version> </plugin> </plugins> </build> </project>
Three classes will be generated in the java directory: the Order concrete class, the OrderOrBuilder interface class, and the OrderProtobuf outer wrapper class.
# In the terminal, navigate to the project directory and use the command to generate source code protoc --java_out=src/main/java src/main/resources/order.proto
Serialization and deserialization test.
package com.aliyun; public class OrderTest { public static void main(String[] args) { // Create an Order object and set field values Order order = Order.newBuilder() .setOrderId(8513) .setOrderName("flink") .setOrderPrice(99.99) .setOrderDate(System.currentTimeMillis()) .build(); // Serialize to byte array byte[] serializedBytes = order.toByteArray(); System.out.println("Serialized byte length:"+ serializedBytes.length); // Deserialize to a new Order object Order deserializedOrder; try { deserializedOrder = Order.parseFrom(serializedBytes); } catch (Exception e) { System.err.println("Deserialization failed: " + e.getMessage()); return; } System.out.println("Original object: \n" + order); // Verify that the field values of the deserialized object match those of the original object if (order.getOrderId() == deserializedOrder.getOrderId() && order.getOrderName().equals(deserializedOrder.getOrderName()) && order.getOrderPrice() == deserializedOrder.getOrderPrice() && order.getOrderDate() == deserializedOrder.getOrderDate()) { System.out.println("Serialization and deserialization test passed!"); } else { System.out.println("Serialization and deserialization test failed!"); } } }
Step 2: Build test data and write to Kafka
This example uses ApsaraMQ for Kafka as the operating environment.
Download the SSL root certificate. If you use an SSL endpoint, you must install this certificate.
username and password are the username and password of the instance.
If ACL is not enabled for the instance, you can obtain the username and password of the default user from the ApsaraMQ for Kafka console Instance Details page Configuration Information section.
If ACL is enabled for the instance, make sure that the SASL user you want to use is of the PLAIN type and has been authorized to send and receive messages. For more information, see Use the ACL feature for access control.
package com.aliyun;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Future;
import com.github.javafaker.Faker; // Import Faker library to generate random test dataset
public class ProtoBufToKafkaTest {
public static void main(String[] args) {
Properties props = new Properties();
// Set the endpoint. Obtain the endpoint of the corresponding topic from the Kafka console
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "<bootstrap_servers>");
// Access protocol, use SASL_SSL protocol for access.
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
// Set the path of the SSL root certificate (absolute path), this file cannot be packaged into the Jar
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "../only.4096.client.truststore.jks");
// Password for the root certificate store, keep unchanged.
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
// SASL authentication method, keep unchanged.
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
// Serialization method for ApsaraMQ for Kafka messages. Serialization method for key/value
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
// Maximum waiting time for requests. In milliseconds.
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30 * 1000);
// Set the number of internal retries for the client.
props.put(ProducerConfig.RETRIES_CONFIG, 5);
// Set the internal retry interval for the client. In milliseconds.
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 3000);
// Set Hostname verification to empty.
props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"aliyun_flink\" password=\"123456\";");
// Construct a Producer object. Note that this object is thread-safe. Generally, one Producer object per process is sufficient.
// If you want to improve performance, you can construct multiple objects, but not too many, preferably no more than 5.
KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props);
String topic = "test";
// New: Create a list to store three messages
List<ProducerRecord<String, byte[]>> messages = new ArrayList<>();
for (int i = 0; i < 3; i++) {
byte[] value = getProtoTestData();
ProducerRecord<String, byte[]> kafkaMessage = new ProducerRecord<>(topic, value);
messages.add(kafkaMessage);
}
try {
// Send messages in batch
List<Future<RecordMetadata>> futures = new ArrayList<>();
for (ProducerRecord<String, byte[]> message : messages) {
Future<RecordMetadata> metadataFuture = producer.send(message);
futures.add(metadataFuture);
}
producer.flush();
// Synchronously get the results of the Future objects
for (Future<RecordMetadata> future : futures) {
try {
RecordMetadata recordMetadata = future.get();
System.out.println("Produce ok:" + recordMetadata.toString());
} catch (Throwable t) {
t.printStackTrace();
}
}
} catch (Exception e) {
// After internal retries by the client, sending still fails, and the business needs to handle such errors.
System.out.println("error occurred");
e.printStackTrace();
}
}
private static byte[] getProtoTestData() {
// Use Faker to generate random data
Faker faker = new Faker();
int orderId = faker.number().numberBetween(1000, 9999); // Randomly generate order ID
String orderName = faker.commerce().productName(); // Randomly generate order name
double orderPrice = faker.number().randomDouble(2, 10, 1000); // Randomly generate order price
long orderDate = System.currentTimeMillis(); // Current time as order date
// Create an object according to the defined data structure.
Order order = Order.newBuilder()
.setOrderId(orderId)
.setOrderName(orderName)
.setOrderPrice(orderPrice)
.setOrderDate(orderDate)
.build();
// Data serialization for sending: Convert object data to byte data output
return order.toByteArray();
}
}
Run the test code to write three Protobuf-formatted data entries to the test Topic in Kafka.
Step 3: Compile, package, and upload
You need to upload protobuf-java-3.21.7.jar and the compiled and packaged KafkaProtobuf.jar.
Only VVR 8.0.9 and above versions use the built-in Protobuf data format. If you are using a lower version, you need to add the flink-protobuf-1.17.2.jar dependency file.
Step 4: Read data with Flink SQL
SQL example reference.
Add the
protobuf.message-class-name
parameter to specify the message class corresponding to the message body. For moreprotobuf
parameter details, see Flink-Protobuf.CREATE TEMPORARY TABLE KafkaSource ( orderId INT, orderName STRING, orderPrice DOUBLE, orderDate BIGINT ) WITH ( 'connector' = 'kafka', 'topic' = 'test', 'properties.group.id' = 'my-group', -- Consumer group ID 'properties.bootstrap.servers' = 'alikafka-serverless-cn-gh647pzkq03-1000-vpc.alikafka.aliyuncs.com:9092,alikafka-serverless-cn-gh647pzkq03-2000-vpc.alikafka.aliyuncs.com:9092,alikafka-serverless-cn-gh647pzkq03-3000-vpc.alikafka.aliyuncs.com:9092', // Fill in the corresponding Kafka broker address. 'format' = 'protobuf', -- Data format used for the value part 'protobuf.message-class-name' = 'com.aliyun.Order', -- Specify the message class corresponding to the message body 'scan.startup.mode' = 'earliest-offset' -- Start reading from the earliest partition in Kafka. ); CREATE TEMPORARY TABLE KafkaSink ( orderId INT, orderName STRING, orderPrice DOUBLE, orderDate BIGINT ) WITH ( 'connector' = 'print' ); INSERT INTO KafkaSink SELECT * FROM KafkaSource ;
Additional file dependency references.
SQL debugging.
View the output after the job is deployed and running.
FAQ
Debug check reports error
Bad return type
.Cause: Source code compiled using proto3 syntax.
Solution: Change to proto2 syntax, recompile, and upload again.
After upstream reading, when writing to other Kafka Topics downstream, numerous warning logs report
CORRUPT_MESSAGE
.Cause: Alibaba Cloud Message Queue for Kafka (Topics with local storage created without the Professional High Write version) does not support idempotent and transactional writes. You will not be able to use the exactly-once semantic functionality provided by the Kafka result table.
Solution: Add the configuration item
properties.enable.idempotence=false
in the result table to disable idempotent write functionality.Job runtime log reports error:
NoClassDefFoundError
.Cause: The version of the uploaded protobuf-java JAR file is inconsistent with the Protocol Buffers compilation.
Solution: Check if the versions of the additional dependency files are consistent, if any files are missing, and the completeness of the compilation and packaging.
Job check reports error:
Could not find any factory for identifier 'protobuf' that implements one of 'org.apache.flink.table.factories.EncodingFormatFactory
.Cause: Only VVR 8.0.9 and above versions support the built-in Protobuf data format.
Solution: Check if the
flink-protobuf
dependency has been added.