All Products
Search
Document Center

Realtime Compute for Apache Flink:Upsert Kafka connector

Last Updated:Nov 22, 2023

This topic describes how to use the Upsert Kafka connector.

Background information

The Upsert Kafka connector can be used to read data from and write data to Kafka topics in the upsert fashion.

  • The Upsert Kafka connector for a source table can convert data that is stored in Kafka topics into a changelog stream. Each data record in the changelog stream represents an update event or a delete event. If a Kafka topic contains a key that is the same as the key in a data record, the value in the data record overwrites the value of the key. The data record is interpreted as UPDATE. If a Kafka topic does not contain such a key in a data record, the value in the data record is inserted into the Kafka topic. The data record is interpreted as INSERT. Each data record in a changelog stream is interpreted as UPSERT (INSERT or UPDATE) because an existing row with the same key is always overwritten. If the value of the key in a data record is null, the data record is interpreted as DELETE.

  • The Upsert Kafka connector for a result table can consume changelog streams that are produced by the source. The Upsert Kafka connector for a result table can write INSERT and UPDATE_AFTER data to Kafka topics as normal Kafka messages. The Upsert Kafka connector can write DELETE data to Kafka topics as Kafka messages with null values. If the value of the key in a data record is null, the Kafka message that uses the key is deleted. Flink partitions data based on the values of the primary key column. This ensures that messages with the same primary key are sorted by value. Therefore, UPDATE or DELETE data that contains the same primary key is written to the same partition.

Item

Description

Table type

Source table and result table

Running mode

Streaming mode and batch mode

Data format

avro, avro-confluent, csv, json, and raw

Metric

  • Metrics for source tables

    • numRecordsIn

    • numRecordsInPerSecond

    • numBytesIn

    • numBytesInPerScond

    • currentEmitEventTimeLag

    • currentFetchEventTimeLag

    • sourceIdleTime

    • pendingRecords

  • Metrics for result tables

    • numRecordsOut

    • numRecordsOutPerSecond

    • numBytesOut

    • numBytesOutPerSecond

    • currentSendTime

API type

SQL API

Data update or deletion in a result table

Supported

Prerequisites

  • A Kafka cluster is created. For more information, see Create a Dataflow Kafka cluster or Create resources in Kafka.

  • A network connection is established between Realtime Compute for Apache Flink and the Kafka cluster. For more information about how to establish a network connection between Realtime Compute for Apache Flink and a Kafka cluster that is created in E-MapReduce (EMR), see Create and manage a VPC and Overview. For more information about how to establish a network connection between Realtime Compute for Apache Flink and an ApsaraMQ for Kafka cluster, see Configure a whitelist.

Limits

  • Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 2.0.0 or later supports the Apache Kafka connector.

  • The Upsert Kafka connector can be used to read or write only data of Apache Kafka 0.10 or later.

  • The Upsert Kafka connector supports only the client parameters of Apache Kafka 2.8. For more information about the configuration parameters of the Kafka producer and consumer, see Consumer Configs and Producer Configs.

  • If an Upsert Kafka result table uses the exactly-once semantics, you must enable the Kafka transaction mechanism to write data to a Kafka cluster. The version of the Kafka cluster must be Apache Kafka 0.11 or later.

Syntax

CREATE TABLE upsert_kafka_sink(
user_region STRING,
pv BIGINT,
uv BIGINT,
PRIMARY KEY(user_region) NOT ENFORCED
)WITH(
'connector'='upsert-kafka',
'topic'='<yourTopicName>',
'properties.bootstrap.servers'='...',
'key.format'='avro',
'value.format'='avro'
);

Parameters in the WITH clause

  • Common parameters

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    connector

    The type of the table.

    STRING

    Yes

    No default value

    Set the value to upsert-kafka.

    properties.bootstrap.servers

    The IP addresses or endpoints and port numbers of Kafka brokers.

    STRING

    Yes

    No default value

    Format: host:port,host:port,host:port. Separate multiple host:port pairs with commas (,).

    properties.*

    The parameters that are configured for the Kafka client.

    STRING

    No

    No default value

    The suffix of this parameter must comply with the rules that are defined in Producer Configs and Consumer Configs.

    Flink removes the properties. prefix and passes the transformed keys and values to the Kafka client. For example, you can set properties.allow.auto.create.topics to false to disable automatic topic creation.

    You cannot modify the configurations of the following parameters by adding the properties. prefix because the values of the parameters are overwritten after you use the Upsert Kafka connector:

    • key.deserializer

    • value.deserializer

    format

    The format used to read or write the value field of Kafka messages.

    STRING

    No

    No default value

    The value of the fieldExpireValue parameter varies based on the value of the fieldExpireMode parameter.

    • csv

    • json

    • avro

    • debezium-json

    • canal-json

    • maxwell-json

    • avro-confluent

    • raw

    key.format

    The format used to read or write the key field of Kafka messages.

    STRING

    Yes

    No default value

    You must configure the key.fields or key.fields-prefix parameter if you configure this parameter.

    Valid values:

    • csv

    • json

    • avro

    • debezium-json

    • canal-json

    • maxwell-json

    • avro-confluent

    • raw

    key.fields

    The key fields in the source table or result table that correspond to the key fields of Kafka messages.

    STRING

    No

    No default value

    Separate multiple field names with semicolons (;), such as field 1;field2.

    key.fields-prefix

    A custom prefix for all key fields in Kafka messages. You can configure this parameter to prevent name conflicts with the value fields.

    STRING

    No

    No default value

    This parameter is used only to distinguish the column names of source tables and result tables. The prefix is removed from the column names when the key fields of Kafka messages are parsed and generated.

    Note

    If you configure this parameter, you must set the value.fields-include parameter to EXCEPT_KEY.

    value.format

    The format used to read or write the value field of Kafka messages.

    STRING

    Yes

    No default value

    The configuration of this parameter is equivalent to the configuration of the format parameter. Therefore, the format parameter cannot be used together with the value.format parameter. If you configure both parameters, a conflict occurs.

    value.fields-include

    Specifies whether to include the fields that correspond to message keys when the value fields of Kafka messages are parsed or generated.

    STRING

    Yes

    ALL

    The value of the fieldExpireValue parameter varies based on the value of the fieldExpireMode parameter.

    • ALL: All fields are processed as value fields of Kafka messages. This is the default value.

    • EXCEPT_KEY: All fields except for the fields specified by the key.fields parameter are processed as value fields of Kafka messages.

    topic

    The name of the topic from which data is read or to which data is written.

    STRING

    Yes

    No default value

    N/A.

  • Parameters only for result tables

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    sink.parallelism

    The parallelism of operators in the Kafka result table.

    INTEGER

    No

    The parallelism of upstream operators, which is determined by the framework.

    N/A.

    sink.buffer-flush.max-rows

    The maximum number of data records that can be cached before the cache is refreshed.

    INTEGER

    No

    0 (disabled)

    If the result table receives a large number of updates on the same key, only the last data record of the key is retained in the cache. In this case, data caching in the result table helps reduce the amount of data that is written to Kafka topics. This prevents potential tombstone messages from being sent to Kafka topics.

    Note

    If you want to enable data caching for result tables, you must set the sink.buffer-flush.max-rows and sink.buffer-flush.interval parameters to values that are greater than 0.

    sink.buffer-flush.interval

    The interval at which the cache is refreshed.

    DURATION

    No

    0 (disabled)

    The unit can be milliseconds, seconds, minutes, or hours. For example, you can configure 'sink.buffer-flush.interval'='1 s'.

    If the result table receives a large number of updates on the same key, only the last data record of the key is retained in the cache. In this case, data caching in the result table helps reduce the amount of data that is written to Kafka topics. This prevents potential tombstone messages from being sent to Kafka topics.

    Note

    If you want to enable data caching for result tables, you must set the sink.buffer-flush.max-rows and sink.buffer-flush.interval parameters to values that are greater than 0.

Sample code

  • Sample code for a source table

    Create a Kafka source table that contains the browsing data of website users.

    CREATE TABLE pageviews(
    user_id BIGINT,
    page_id BIGINT,
    viewtime TIMESTAMP,
    user_region STRING,
    WATERMARK FOR viewtime AS viewtime - INTERVAL '2' SECOND
    )WITH(
    'connector'='kafka',
    'topic'='<yourTopicName>',
    'properties.bootstrap.servers'='...',
    'format'='json'
    );
  • Sample code for a result table

    • Create an Upsert Kafka result table.

      CREATE TABLE pageviews_per_region(
      user_region STRING,
      pv BIGINT,
      uv BIGINT,
      PRIMARY KEY(user_region) NOT ENFORCED
      )WITH(
      'connector'='upsert-kafka',
      'topic'='<yourTopicName>',
      'properties.bootstrap.servers'='...',
      'key.format'='avro',
      'value.format'='avro'
      );
    • Write the browsing data of website users to the result table.

      INSERT INTO pageviews_per_region
      SELECT
      user_region,
      COUNT(*),
      COUNT(DISTINCTuser_id)
      FROM pageviews
      GROUP BY user_region;

Best practices

Synchronize data from all tables in a MySQL database to Kafka