All Products
Search
Document Center

Realtime Compute for Apache Flink:Process Protobuf data using the Kafka connector

Last Updated:Dec 18, 2025

The Kafka connector supports reading data in Protobuf format.

Protobuf format

Protocol Buffers (Protobuf) is an efficient, language-neutral, and structured data serialization format developed by Google. Compared to JSON and XML, Protobuf offers the following advantages:

  • Small size: The serialized data is more compact, saving storage and network transmission resources.

  • High speed: Serialization and deserialization are highly efficient, which makes it suitable for high-performance scenarios.

  • Structured definition: You can define the data structure in a .proto file. This provides a clear and easy-to-maintain interface.

  • Strong cross-language support: It supports mainstream programming languages, which facilitates data exchange between multiple systems.

As a result, Protobuf is widely used in scenarios such as high-frequency communication, microservices, and real-time computing. It is one of the recommended high-efficiency data formats in Kafka.

Limits

Only Protocol Buffers 21.7 and earlier versions are supported.

Step 1: Compile the Protobuf file

  1. Create a Protobuf file named order.proto.

    proto3

    syntax = "proto3";
    // The logical package name of the proto file, used to import this file in other .proto files.
    package com.aliyun;
    // The Java package name. If not specified, the proto package name is used by default.
    option java_package = "com.aliyun";
    // Specifies whether to compile into multiple files. We recommend that you set this to true so that each Message generates a separate .java file instead of an inner class.
    option java_multiple_files = true;
    // The output class name in Java. When java_multiple_files is true, this class mainly contains metadata, such as the filename, and no longer wraps the Message class.
    option java_outer_classname = "OrderProtoBuf";
    message Order {
      // In Proto3, the optional and required keywords are removed.
      // Note: The default value for basic data types such as int, long, and double is 0. The default value for a string is an empty string.
      // In Java, the hasOrderId() method is no longer generated. This makes it impossible to distinguish between a field that is not set and a field that is set to 0.
      int32 orderId = 1;
      string orderName = 2;
      double orderPrice = 3;
      int64 orderDate = 4;
    }

    proto2

    syntax = "proto2";
    // The package name of the proto file.
    package com.aliyun;
    // The Java package name. If not specified, the proto package name is used by default.
    option java_package = "com.aliyun";
    // Specifies whether to compile into multiple files.
    option java_multiple_files = true;
    // The name of the wrapper class in Java.
    option java_outer_classname = "OrderProtoBuf";
    message Order {
      optional int32 orderId = 1;
      optional string orderName= 2;
      optional double orderPrice = 3;
      optional int64 orderDate = 4;
    }
  2. Use the Protocol Buffers tool to generate the source code.

    Create an empty Maven project and place the Protobuf file in the src/main/proto directory.

    Directory example

    KafkaProtobuf
    ‒ src
      -main
        -java
        -proto
          -order.proto
    ‒ pom.xml

    pom.xml

    Note

    The version must match the dependency version in Flink, such as 3.21.7. Otherwise, the generated class might conflict with protobuf-java:3.21.7.

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
        <modelVersion>4.0.0</modelVersion>
            <groupId>com.aliyun</groupId>
            <artifactId>KafkaProtobuf</artifactId>
            <version>1.0-SNAPSHOT</version>
    
        <dependencies>
            <dependency>
                <groupId>com.google.protobuf</groupId>
                <artifactId>protobuf-java</artifactId>
                <version>3.21.7</version> <!-- The version number must be consistent with the Protobuf version used to generate the code. -->
            </dependency>
    
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>3.3.1</version> <!-- Adjust this based on your Kafka version. -->
            </dependency>
            
            <dependency>
                <groupId>com.github.javafaker</groupId>
                <artifactId>javafaker</artifactId>
                <version>1.0.2</version>
            </dependency>
        </dependencies>
    
        <build>
            <finalName>KafkaProtobuf</finalName>
            <plugins>
                <!-- Java compiler -->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.13.0</version>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                    </configuration>
                </plugin>
                <!-- Use maven-shade to create a fat JAR file that contains all required dependencies. -->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-shade-plugin</artifactId>
                    <version>3.5.3</version>
                </plugin>
            </plugins>
        </build>
    </project>

    This process generates three classes in the java directory: the Order concrete class, the OrderOrBuilder interface, and the OrderProtobuf outer wrapper class.

    # In the terminal, navigate to the project's root directory (where pom.xml is located) and run the command to generate the source code.
    protoc --java_out=src/main/java --proto_path=src/main/proto src/main/proto/order.proto
  3. Test the serialization and deserialization.

    package com.aliyun;
    
    public class OrderTest {
        public static void main(String[] args) {
            // Create an Order object and set its field values.
            Order order = Order.newBuilder()
                    .setOrderId(8513)
                    .setOrderName("flink")
                    .setOrderPrice(99.99)
                    .setOrderDate(System.currentTimeMillis())
                    .build();
    
            // Serialize it into a byte array.
            byte[] serializedBytes = order.toByteArray();
            System.out.println("Byte length after serialization:" + serializedBytes.length);
            // Deserialize it into a new Order object.
            Order deserializedOrder;
            try {
                deserializedOrder = Order.parseFrom(serializedBytes);
            } catch (Exception e) {
                System.err.println("Deserialization failed: " + e.getMessage());
                return;
            }
    
            System.out.println("Original object: \n" + order);
            // Verify that the field values of the deserialized object match the original object.
            if (order.getOrderId() == deserializedOrder.getOrderId() &&
                    order.getOrderName().equals(deserializedOrder.getOrderName()) &&
                order.getOrderPrice() == deserializedOrder.getOrderPrice() &&
                order.getOrderDate() == deserializedOrder.getOrderDate()) {
                System.out.println("Serialization and deserialization test passed!");
            } else {
                System.out.println("Serialization and deserialization test failed!");
            }
    
        }
    }

Step 2: Build test data and write it to Kafka

This example uses ApsaraMQ for Kafka.

  1. Download the SSL root certificate. This certificate is required if you use an SSL endpoint.

  2. The username and password are the username and password for the instance.

    • If ACL is disabled for the instance, you can obtain the default username and password from the Configuration Information section on the Instance Details page in the ApsaraMQ for Kafka console.

    • If ACL is enabled for the instance, ensure that the SASL user is a PLAIN user and has been granted permissions to send and receive messages. For more information, see Use the ACL feature for access control.

package com.aliyun;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Future;
import com.github.javafaker.Faker; // Import the Faker library to generate a random test dataset.

public class ProtoBufToKafkaTest {
    public static void main(String[] args) {
        Properties props = new Properties();
        // Set the endpoint. Obtain the endpoint for the topic from the Kafka console.
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "<bootstrap_servers>");
        // Access protocol. Use the SASL_SSL protocol.
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        // Set the absolute path of the SSL root certificate. This file cannot be packaged into the JAR file.
        props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "../only.4096.client.truststore.jks");
        // The password of the root certificate store. Keep it unchanged.
        props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
        // The SASL authentication method. Keep it unchanged.
        props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
        // The serialization method for ApsaraMQ for Kafka messages. The serialization method for the key and value.
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
        // The maximum wait time for a request, in milliseconds.
        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30 * 1000);
        // The number of internal retries for the client.
        props.put(ProducerConfig.RETRIES_CONFIG, 5);
        // The internal retry interval for the client, in milliseconds.
        props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 3000);
        // Change the hostname verification to an empty string.
        props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
        props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"aliyun_flink\" password=\"123456\";");
        // Construct a Producer object. This object is thread-safe. Generally, one Producer object is sufficient for a process.
        // To improve performance, you can construct more objects, but do not create more than five.
        KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props);
        String topic = "test";

        // New: Create a list to store three messages.
        List<ProducerRecord<String, byte[]>> messages = new ArrayList<>();
        for (int i = 0; i < 3; i++) {
            byte[] value = getProtoTestData();
            ProducerRecord<String, byte[]> kafkaMessage = new ProducerRecord<>(topic, value);
            messages.add(kafkaMessage);
        }

        try {
            // Send messages in a batch.
            List<Future<RecordMetadata>> futures = new ArrayList<>();
            for (ProducerRecord<String, byte[]> message : messages) {
                Future<RecordMetadata> metadataFuture = producer.send(message);
                futures.add(metadataFuture);
            }
            producer.flush();

            // Synchronously get the result of the Future object.
            for (Future<RecordMetadata> future : futures) {
                try {
                    RecordMetadata recordMetadata = future.get();
                    System.out.println("Produce ok:" + recordMetadata.toString());
                } catch (Throwable t) {
                    t.printStackTrace();
                }
            }
        } catch (Exception e) {
            // If the message still fails to send after internal retries, your business must handle this type of error.
            System.out.println("error occurred");
            e.printStackTrace();
        }
    }

    private static byte[] getProtoTestData() {
        // Use Faker to generate random data.
        Faker faker = new Faker();
        int orderId = faker.number().numberBetween(1000, 9999); // Generate a random order ID.
        String orderName = faker.commerce().productName(); // Generate a random order name.
        double orderPrice = faker.number().randomDouble(2, 10, 1000); // Generate a random order price.
        long orderDate = System.currentTimeMillis(); // Use the current time as the order date.

        // Create an object according to the defined data structure.
        Order order = Order.newBuilder()
                .setOrderId(orderId)
                .setOrderName(orderName)
                .setOrderPrice(orderPrice)
                .setOrderDate(orderDate)
                .build();

        // Serialize the data to send: Convert the object data into byte data for output.
        return order.toByteArray();
    }
}
  1. Run the test code to write three messages in Protobuf format to the test topic in Kafka.

    image

Step 3: Compile, package, and upload

Upload the compiled and packaged KafkaProtobuf.jar file.

image

Note

The built-in Protobuf data format is supported only in Ververica Runtime (VVR) 8.0.9 and later. If you use an earlier version, you must add the flink-protobuf-1.17.2.jar dependency file.

Step 4: Read data using Flink SQL

  1. Sample SQL

    Add the protobuf.message-class-name parameter to specify the message class for the message body. For more information about protobuf parameters, see Protobuf Format.

    CREATE TEMPORARY TABLE KafkaSource (
      orderId INT,
      orderName STRING,
      orderPrice DOUBLE,
      orderDate BIGINT
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'test',
      'properties.group.id' = 'my-group',  -- The ID of the consumer group.
      'properties.bootstrap.servers' = '<bootstrap_servers>', -- Enter the corresponding Kafka broker address.
      'format' = 'protobuf',              -- The data format used for the value part.
      'protobuf.message-class-name' = 'com.aliyun.Order',    -- Specify the message class that corresponds to the message body.                       
      'scan.startup.mode' = 'earliest-offset'  -- Read from the earliest partition of Kafka.
    );
    
    CREATE TEMPORARY TABLE KafkaSink (
      orderId INT,
      orderName STRING,
      orderPrice DOUBLE,
      orderDate BIGINT
    ) WITH (
        'connector' = 'print'
    );
    
    INSERT INTO KafkaSink
    SELECT * FROM KafkaSource
    ;
  2. Add file dependency references.

    image

  3. Debug the SQL.

    image

  4. View the output after the job is deployed and running.

    image

FAQ

  • After the upstream reads data and the downstream writes it to another Kafka topic, many CORRUPT_MESSAGE warnings appear in the logs.

    Cause: ApsaraMQ for Kafka topics that use local storage and are not created in a Professional Edition (High Write) instance do not support idempotent or transactional writes. Therefore, you cannot use the exactly-once semantics feature provided by the Kafka sink table.

    Solution: Add the configuration item properties.enable.idempotence=false to the sink table to disable the idempotent write feature.

  • The job runtime log contains a NoClassDefFoundError error.

    Cause: The version of the uploaded protobuf-java JAR package is inconsistent with the version used for Protocol Buffers compilation.

    Solution: Check that the versions of the attached dependency files are consistent, that no files are missing, and that the compilation and packaging are complete.

  • The job check reports the following error: Could not find any factory for identifier 'protobuf' that implements one of 'org.apache.flink.table.factories.EncodingFormatFactory.

    Cause: The built-in Protobuf data format is supported only in VVR 8.0.9 and later.

    Solution: Verify that you have added the flink-protobuf dependency.