All Products
Search
Document Center

Realtime Compute for Apache Flink:Using Partitioner to implement custom partition writing

Last Updated:May 09, 2025

This topic describes how to implement custom partition logic based on FlinkKafkaPartitioner to write data to different Kafka partitions according to your needs.

Partition modes

The Kafka connector can be configured with an appropriate partition mode by setting the sink.partitioner parameter. If none of the available modes meet your requirements, you need to implement custom partition mapping to satisfy different data writing needs.

Mode

Partition logic

default (default value)

  • Messages with keys: Messages with the same key are assigned to the same partition, ensuring order within the partition.

  • Messages without keys: Round-robin strategy is used to distribute messages evenly for load balancing.

fixed

Each concurrent task corresponds to a fixed partition.

  • If there are 3 Kafka partitions and 3 concurrent tasks, each concurrent task will be fixed to consume one Kafka partition.

  • If there are 4 Kafka partitions and 3 concurrent tasks, some partitions will be assigned to the same concurrent task for processing.

  • If there are 2 Kafka partitions and 3 concurrent tasks, some concurrent tasks will be idle and cannot be assigned to any partition.

round-robin

Data from Flink concurrent tasks will be assigned to Kafka partitions in a round-robin manner.

Implementing custom partitioning in SQL jobs

Step 1: Write a custom partitioner

The partitioner needs to extend the FlinkKafkaPartitioner class and override the partition method.

KafkaSinkPartitioner.java

Custom partition logic: Extract the last two digits from the date field (string) in the data, take the modulo of the partition count to ensure data with the same date falls into the same partition (target partition count is 3).

package com.aliyun;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.table.data.GenericRowData;

public class KafkaSinkPartitioner extends FlinkKafkaPartitioner {

    @Override
    public int partition(Object record, byte[] key, byte[] value, String topic, int[] partitionSize) {
    // In FlinkSQL, the actual type of record is GenericRowData
        if (record instanceof GenericRowData){
            GenericRowData grData  = (GenericRowData) record;
            // You can use the following class methods to get all data in this row
            for (int i = 0; i < grData.getArity(); i++){
                Object field = grData.getField(i);
                System.out.println("index: " + i + " :" + field);
            }
            // Based on the date, take the last two digits of the date to write data with the same date to the same partition
            String dateString = grData.getString(2).toString();
            int date = Integer.parseInt(dateString.substring(dateString.length() - 2));
            return date % 3;
        }
        else {
            throw new IllegalArgumentException("record is not GenericRowData");
        }
    }
}

Step 2: Write SQL job

You need to configure the sink.partitioner parameter in the SQL job to use it. This example uses ApsaraMQ for Kafka for demonstration.

CREATE TEMPORARY TABLE KafkaSource (
  order_id STRING,
  order_name STRING,
  dt STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'source',
  'properties.group.id' = 'test-group',  -- Consumer group ID
  'properties.bootstrap.servers' = '<bootstrap.servers>', -- Fill in the corresponding Kafka broker address.
  'format' = 'csv',                        -- Data format used for the value part
  'scan.startup.mode' = 'earliest-offset'  -- Start reading from the earliest partition in Kafka.
);

CREATE TEMPORARY TABLE kafkaSink (
  order_id STRING,
  order_name STRING,
  dt STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'sink',
  'properties.bootstrap.servers' ='<bootstrap.servers>',
  'format' = 'csv',
  'properties.enable.idempotence' = 'false',
  'sink.partitioner' = 'com.aliyun.KafkaSinkPartitioner'
);

INSERT INTO kafkaSink
SELECT * FROM KafkaSource
;

pom.xml

Other versions of the VVR Kafka connector are available in the Maven Central Repository.

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.aliyun</groupId>
    <artifactId>KafkaPartitionerSQL</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
    <!--Kafka connector-->
        <dependency>
            <groupId>com.alibaba.ververica</groupId>
            <artifactId>ververica-connector-kafka</artifactId>
            <version>1.17-vvr-8.0.11-1</version>
        </dependency>
    <!--flink dependencies-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>1.17.2</version>
        </dependency>
    </dependencies>

    <build>
        <finalName>KafkaPartitionerSQL</finalName>
        <plugins>
            <!-- Java compiler -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.13.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <!-- Use maven-shade-plugin to create a fat JAR file that contains all the necessary dependencies. -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.5.3</version>
            </plugin>
        </plugins>
    </build>
</project>
Important
  • ApsaraMQ for Kafka does not support idempotent and transactional writes, so it cannot implement Exactly-Once Semantics. You need to add properties.enable.idempotence=false to the result table to disable the idempotent write feature.

  • For sink.partitioner, you need to fill in the complete reference path of the class that extends FlinkKafkaPartitioner, such as com.aliyun.KafkaSinkPartitioner.

  • For more information, see Kafka connector WITH parameters.

Step 3: Package and upload the custom partitioner

Use the File Management feature to upload the compiled JAR package to the real-time computing console.

image

Step 4: Job reference

After referencing the JAR package as an Additional Dependency File, set the sink.partitioner parameter in the WITH parameters. The parameter value should be the complete class path of the partitioner, such as org.mycompany.MyPartitioner.

image

Step 5: Verification test

Use the quick message sending and receiving experience of ApsaraMQ for Kafka for testing.

  1. Log on to the Message Queue for Kafka console.

  2. In the Overview page, select a region in the Resource Distribution section.

  3. On the Instance List page, click the target instance name.

  4. On the Topic Management page, select the corresponding Topic, and click Experience Message Sending for testing.

    image

    yun001,flink,20250501
    yun002,flink,20250505
    yun003,flink,20250505
    yun004,flink,20250505
  5. After sending four messages, you can check how the messages are written to partitions in the downstream topic.

    You can see that data with the same date is written to the same partition.

    image

Implementing custom partitioning in JAR jobs

Step 1: Write a JAR job

Use setPartitioner in the VVR Kafka connector to specify the custom partition logic class.

KafkaPartitionerDataStream.java

Custom partition logic: Extract the last two digits from the date field (string) in the data, take the modulo of the partition count to ensure data with the same date falls into the same partition (target partition count is 3).

package com.aliyun;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.StringSerializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;

public class KafkaPartitionerDataStream {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // Build Kafka Source
        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                .setBootstrapServers("<BootstrapServers>")
                .setTopics("source")
                .setGroupId("test-group")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
                .build();
        // Use no watermark strategy
        DataStreamSource<String> source = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source");
        // Build Kafka Sink
        KafkaSink<String> sink = KafkaSink.<String>builder()
                .setBootstrapServers("<BootstrapServers>")
                .setRecordSerializer(
                        KafkaRecordSerializationSchema.builder()
                                .setTopic("sink")
                                // Custom partition
                                .setPartitioner(new KafkaSinkPartitioner())      
                                .setKafkaValueSerializer(StringSerializer.class) 
                                .build())
                .build();
        source.sinkTo(sink).name("Kafka Sink");
        env.execute();
    }
    
    static class KafkaSinkPartitioner extends FlinkKafkaPartitioner<String> {
    // Based on the date, take the last two digits of the date to write data with the same date to the same partition 
        @Override
        public int partition(String  record, byte[] key, byte[] value, String topic, int[] partitionSize) {
            String[] s = record.split(",");
            int date = Integer.parseInt(s[2].substring(s[2].length() - 2));
            return date % 3;
        }
    }

}

pom.xml

Other versions of the VVR Kafka connector are available in the Maven Central Repository.

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.aliyun</groupId>
    <artifactId>KafkaPartitionerDataStream</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <!--VVR Kafka Connector-->
        <dependency>
            <groupId>com.alibaba.ververica</groupId>
            <artifactId>ververica-connector-kafka</artifactId>
            <version>1.17-vvr-8.0.11-1</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime</artifactId>
            <version>1.17.2</version>
            <scope>provided</scope>
        </dependency>

    </dependencies>

    <build>
        <finalName>KafkaPartitionerDS</finalName>
        <plugins>
            <!-- Java compiler -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.13.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <!-- Use maven-shade-plugin to create a fat JAR file that contains all the necessary dependencies. -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.5.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.aliyun.KafkaPartitionerDataStream</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

        </plugins>
    </build>
</project>

Step 2: Package the job and upload

Use the File Management feature to upload the compiled JAR package to the real-time computing console. It is recommended to declare connector dependencies with provided scope and import them as additional dependency files for better version management and lighter JAR packages that are easier to upload. For more information, see Connector dependencies and usage.

image

Step 3: Deploy the job

For more information about job deployment parameters, see Deploy JAR jobs.

image

Step 4: Verification test

Use the quick message sending and receiving experience of ApsaraMQ for Kafka for testing.

  1. Log on to the Message Queue for Kafka console.

  2. In the Overview page, select a region in the Resource Distribution section.

  3. On the Instance List page, click the target instance name.

  4. On the Topic Management page, select the corresponding Topic, and click Experience Message Sending for testing.

    image

    yun001,flink,20250501
    yun002,flink,20250505
    yun003,flink,20250505
    yun004,flink,20250505
  5. After sending four messages, you can check how the messages are written to partitions in the downstream topic.

    You can see that data with the same date is written to the same partition.

    image

References