All Products
Search
Document Center

Realtime Compute for Apache Flink:Processing Protobuf data format with Kafka

Last Updated:May 09, 2025

The Kafka connector supports reading data in Protobuf format.

Protobuf format

Protobuf (Protocol Buffers) is a high-efficiency, cross-language, structured data serialization format developed by Google. Compared to JSON and XML, it offers the following significant advantages:

  • Small size: Serialized data is more compact, saving storage and network transmission resources.

  • Fast speed: High efficiency in serialization and deserialization, suitable for high-performance scenarios.

  • Structured definition: Data structures are defined through .proto files, with clear interfaces that are easy to maintain.

  • Strong cross-language support: Supports mainstream programming languages, facilitating data interaction between multiple systems.

Therefore, Protobuf is widely used in high-frequency communication, microservices, real-time computing, and other scenarios, making it one of the recommended efficient data formats in Kafka.

Limits

  • Only supports source code generated from Protobuf files defined using proto2 syntax.

  • Only supports Protocol Buffers 21.7 and lower versions.

Step 1: Compile Protobuf files

  1. Create a Protobuf file named order.proto.

    syntax = "proto2";
    // package name of proto
    package com.aliyun;
    // java package name, if not specified, proto's package will be used by default
    option java_package = "com.aliyun";
    // whether to compile into multiple files
    option java_multiple_files = true;
    // class name of java wrapper class
    option java_outer_classname = "OrderProtoBuf";
    message Order {
      optional int32 orderId = 1;
      optional string orderName= 2;
      optional double orderPrice = 3;
      optional int64 orderDate = 4;
    }
  2. Use the Protocol Buffers tool to generate source code.

    Create an empty Maven project and place the Protobuf file in the resources directory.

    Example directories

    KafkaProtobuf
    ‒ src
      -java
      -resources 
        -order.proto
    ‒ pom.xml

    pom.xml

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
        <modelVersion>4.0.0</modelVersion>
            <groupId>com.aliyun</groupId>
            <artifactId>KafkaProtobuf</artifactId>
            <version>1.0-SNAPSHOT</version>
    
        <dependencies>
            <dependency>
                <groupId>com.google.protobuf</groupId>
                <artifactId>protobuf-java</artifactId>
                <version>3.21.7</version> <!-- The version must be consistent with the Protobuf version used to generate the code -->
            </dependency>
    
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>3.3.1</version> <!-- Please adjust according to your Kafka version -->
            </dependency>
            
            <dependency>
                <groupId>com.github.javafaker</groupId>
                <artifactId>javafaker</artifactId>
                <version>1.0.2</version>
            </dependency>
        </dependencies>
    
        <build>
            <finalName>KafkaProtobuf</finalName>
            <plugins>
                <!-- Java compiler -->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.13.0</version>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                    </configuration>
                </plugin>
                <!-- Use maven-shade-plugin to create a fat JAR file that contains all the necessary dependencies. -->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-shade-plugin</artifactId>
                    <version>3.5.3</version>
                </plugin>
            </plugins>
        </build>
    
    
    </project>
    

    Three classes will be generated in the java directory: the Order concrete class, the OrderOrBuilder interface class, and the OrderProtobuf outer wrapper class.

    # In the terminal, navigate to the project directory and use the command to generate source code
    protoc --java_out=src/main/java src/main/resources/order.proto
  3. Serialization and deserialization test.

    package com.aliyun;
    
    public class OrderTest {
        public static void main(String[] args) {
            // Create an Order object and set field values
            Order order = Order.newBuilder()
                    .setOrderId(8513)
                    .setOrderName("flink")
                    .setOrderPrice(99.99)
                    .setOrderDate(System.currentTimeMillis())
                    .build();
    
            // Serialize to byte array
            byte[] serializedBytes = order.toByteArray();
            System.out.println("Serialized byte length:"+ serializedBytes.length);
            // Deserialize to a new Order object
            Order deserializedOrder;
            try {
                deserializedOrder = Order.parseFrom(serializedBytes);
            } catch (Exception e) {
                System.err.println("Deserialization failed: " + e.getMessage());
                return;
            }
    
            System.out.println("Original object: \n" + order);
            // Verify that the field values of the deserialized object match those of the original object
            if (order.getOrderId() == deserializedOrder.getOrderId() &&
                    order.getOrderName().equals(deserializedOrder.getOrderName()) &&
                order.getOrderPrice() == deserializedOrder.getOrderPrice() &&
                order.getOrderDate() == deserializedOrder.getOrderDate()) {
                System.out.println("Serialization and deserialization test passed!");
            } else {
                System.out.println("Serialization and deserialization test failed!");
            }
    
        }
    }

Step 2: Build test data and write to Kafka

This example uses ApsaraMQ for Kafka as the operating environment.

  1. Download the SSL root certificate. If you use an SSL endpoint, you must install this certificate.

  2. username and password are the username and password of the instance.

    • If ACL is not enabled for the instance, you can obtain the username and password of the default user from the ApsaraMQ for Kafka console Instance Details page Configuration Information section.

    • If ACL is enabled for the instance, make sure that the SASL user you want to use is of the PLAIN type and has been authorized to send and receive messages. For more information, see Use the ACL feature for access control.

package com.aliyun;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Future;
import com.github.javafaker.Faker; // Import Faker library to generate random test dataset

public class ProtoBufToKafkaTest {
    public static void main(String[] args) {
        Properties props = new Properties();
        // Set the endpoint. Obtain the endpoint of the corresponding topic from the Kafka console
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "<bootstrap_servers>");
        // Access protocol, use SASL_SSL protocol for access.
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        // Set the path of the SSL root certificate (absolute path), this file cannot be packaged into the Jar
        props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "../only.4096.client.truststore.jks");
        // Password for the root certificate store, keep unchanged.
        props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
        // SASL authentication method, keep unchanged.
        props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
        // Serialization method for ApsaraMQ for Kafka messages. Serialization method for key/value
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
        // Maximum waiting time for requests. In milliseconds.
        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30 * 1000);
        // Set the number of internal retries for the client.
        props.put(ProducerConfig.RETRIES_CONFIG, 5);
        // Set the internal retry interval for the client. In milliseconds.
        props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 3000);
        // Set Hostname verification to empty.
        props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
        props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"aliyun_flink\" password=\"123456\";");
        // Construct a Producer object. Note that this object is thread-safe. Generally, one Producer object per process is sufficient.
        // If you want to improve performance, you can construct multiple objects, but not too many, preferably no more than 5.
        KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props);
        String topic = "test";

        // New: Create a list to store three messages
        List<ProducerRecord<String, byte[]>> messages = new ArrayList<>();
        for (int i = 0; i < 3; i++) {
            byte[] value = getProtoTestData();
            ProducerRecord<String, byte[]> kafkaMessage = new ProducerRecord<>(topic, value);
            messages.add(kafkaMessage);
        }

        try {
            // Send messages in batch
            List<Future<RecordMetadata>> futures = new ArrayList<>();
            for (ProducerRecord<String, byte[]> message : messages) {
                Future<RecordMetadata> metadataFuture = producer.send(message);
                futures.add(metadataFuture);
            }
            producer.flush();

            // Synchronously get the results of the Future objects
            for (Future<RecordMetadata> future : futures) {
                try {
                    RecordMetadata recordMetadata = future.get();
                    System.out.println("Produce ok:" + recordMetadata.toString());
                } catch (Throwable t) {
                    t.printStackTrace();
                }
            }
        } catch (Exception e) {
            // After internal retries by the client, sending still fails, and the business needs to handle such errors.
            System.out.println("error occurred");
            e.printStackTrace();
        }
    }

    private static byte[] getProtoTestData() {
        // Use Faker to generate random data
        Faker faker = new Faker();
        int orderId = faker.number().numberBetween(1000, 9999); // Randomly generate order ID
        String orderName = faker.commerce().productName(); // Randomly generate order name
        double orderPrice = faker.number().randomDouble(2, 10, 1000); // Randomly generate order price
        long orderDate = System.currentTimeMillis(); // Current time as order date

        // Create an object according to the defined data structure.
        Order order = Order.newBuilder()
                .setOrderId(orderId)
                .setOrderName(orderName)
                .setOrderPrice(orderPrice)
                .setOrderDate(orderDate)
                .build();

        // Data serialization for sending: Convert object data to byte data output
        return order.toByteArray();
    }
}
  1. Run the test code to write three Protobuf-formatted data entries to the test Topic in Kafka.

    image

Step 3: Compile, package, and upload

You need to upload protobuf-java-3.21.7.jar and the compiled and packaged KafkaProtobuf.jar.

Note

Only VVR 8.0.9 and above versions use the built-in Protobuf data format. If you are using a lower version, you need to add the flink-protobuf-1.17.2.jar dependency file.

image

Step 4: Read data with Flink SQL

  1. SQL example reference.

    Add the protobuf.message-class-name parameter to specify the message class corresponding to the message body. For more protobuf parameter details, see Flink-Protobuf.

    CREATE TEMPORARY TABLE KafkaSource (
      orderId INT,
      orderName STRING,
      orderPrice DOUBLE,
      orderDate BIGINT
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'test',
      'properties.group.id' = 'my-group',  -- Consumer group ID
      'properties.bootstrap.servers' = 'alikafka-serverless-cn-gh647pzkq03-1000-vpc.alikafka.aliyuncs.com:9092,alikafka-serverless-cn-gh647pzkq03-2000-vpc.alikafka.aliyuncs.com:9092,alikafka-serverless-cn-gh647pzkq03-3000-vpc.alikafka.aliyuncs.com:9092', // Fill in the corresponding Kafka broker address.
      'format' = 'protobuf',              -- Data format used for the value part
      'protobuf.message-class-name' = 'com.aliyun.Order',    -- Specify the message class corresponding to the message body                       
      'scan.startup.mode' = 'earliest-offset'  -- Start reading from the earliest partition in Kafka.
    );
    
    CREATE TEMPORARY TABLE KafkaSink (
      orderId INT,
      orderName STRING,
      orderPrice DOUBLE,
      orderDate BIGINT
    ) WITH (
        'connector' = 'print'
    );
    
    INSERT INTO KafkaSink
    SELECT * FROM KafkaSource
    ;
  2. Additional file dependency references.

    image

  3. SQL debugging.

    image

  4. View the output after the job is deployed and running.

    image

FAQ

  • Debug check reports error Bad return type.

    Cause: Source code compiled using proto3 syntax.

    Solution: Change to proto2 syntax, recompile, and upload again.

  • After upstream reading, when writing to other Kafka Topics downstream, numerous warning logs report CORRUPT_MESSAGE.

    Cause: Alibaba Cloud Message Queue for Kafka (Topics with local storage created without the Professional High Write version) does not support idempotent and transactional writes. You will not be able to use the exactly-once semantic functionality provided by the Kafka result table.

    Solution: Add the configuration item properties.enable.idempotence=false in the result table to disable idempotent write functionality.

  • Job runtime log reports error: NoClassDefFoundError.

    Cause: The version of the uploaded protobuf-java JAR file is inconsistent with the Protocol Buffers compilation.

    Solution: Check if the versions of the additional dependency files are consistent, if any files are missing, and the completeness of the compilation and packaging.

  • Job check reports error: Could not find any factory for identifier 'protobuf' that implements one of 'org.apache.flink.table.factories.EncodingFormatFactory.

    Cause: Only VVR 8.0.9 and above versions support the built-in Protobuf data format.

    Solution: Check if the flink-protobuf dependency has been added.