This topic provides the DDL syntax that is used to create an Upsert Kafka result table, describes the parameters in the WITH clause, and provides sample code.

What is Upsert Kafka?

Upsert Kafka is implemented based on the Upsert Kafka of the Apache Flink community. For more information, see Upsert Kafka SQL Connector. Upsert Kafka can consume changelog streams, allow Flink to write INSERT and UPDATE_AFTER data to Kafka topics as normal Kafka messages, and use the UPSERT method to write DELETE data to Kafka topics. A record with a null value represents a DELETE event. Flink also partitions data based on the value of the primary key column, which ensures that messages with the same primary key are ordered by value. Therefore, UPDATE or DELETE data with the same primary key is written to the same partition.

Limits

  • Only Flink that uses Ververica Runtime (VVR) 2.1.4 or later supports Upsert Kafka connectors.
  • Upsert Kafka connectors allow you to write data to only Kafka 0.10 or later.
  • Upsert Kafka connectors support only the producer configuration parameters of Kafka 2.4. For more information about the configuration parameters of the Kafka producer, see Producer Configs.

DDL syntax

A table named upsert_kafka_sink is used as an example in the following DDL statement.
CREATE TABLE upsert_kafka_sink (
  user_region STRING,
  pv BIGINT,
  uv BIGINT,
  PRIMARY KEY (user_region) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = '<yourTopicName>',
  'properties.bootstrap.servers' = '...',
  'key.format' = 'avro',
  'value.format' = 'avro'
);
Note You must specify a primary key for Upsert Kafka.

Parameters in the WITH clause

Parameter Description Required Data type Remarks
connector The type of the result table. Yes STRING Set the value to upsert-kafka.
topic The topic of the result table. Yes STRING N/A.
properties.bootstrap.servers The IP addresses or endpoints of Kafka brokers. Yes STRING Format: host:port,host:port,host:port. The IP addresses or endpoints are separated by commas (,).
key.format The format that is used to serialize the Key field of Upsert Kafka messages. Yes STRING Valid values:
  • csv
  • json
  • avro
value.format The format that is used to serialize the Value field of Upsert Kafka messages. Yes STRING Valid values:
  • csv
  • json
  • avro
value.fields-include Specifies the fields in the Value field. No String Valid values:
  • ALL: All the fields of the table schema are included. This is the default value.
  • EXCEPT_KEY: All the fields of the table schema are included, except the primary key field.
properties.* The specified Kafka parameters. No String The suffix must match the configuration that is defined in Apache Kafka Documentation. Flink automatically removes the properties. prefix and passes the converted parameter names and values to the Kafka client. For example, you can set properties.allow.auto.create.topics to false to disable the feature of automatic topic creation.
Note You cannot use this parameter to specify the key.deserializer or value.deserializer parameter because Flink will overwrite the values of the two parameters.
For more information about the configuration parameters of the Kafka producer, see Producer Configs. To directly configure the Kafka producer used by the connector, add the properties. prefix before Kafka producer configuration parameters and append the Kafka producer configuration to the WITH clause. The following sample code shows that the Message Queue for Apache Kafka cluster requires Simple Authentication and Security Layer (SASL) authentication.
CREATE TABLE kafkaTable (
    ...
) WITH (
    ...
    'properties.security.protocol' = 'SASL_PLAINTEXT',
    'properties.sasl.mechanism' = 'PLAIN',
    'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="USERNAME" password="PASSWORD";'
);

Sample code

--Create a source table for storing data about PVs and UVs. 
CREATE TABLE pageviews (
  user_id BIGINT,
  page_id BIGINT,
  viewtime TIMESTAMP,
  user_region STRING,
  WATERMARK FOR viewtime AS viewtime - INTERVAL '2' SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = '<yourTopicName>',
  'properties.bootstrap.servers' = '...',
  'format' = 'json'
);

--Create an Upsert Kafka result table. 
CREATE TABLE pageviews_per_region (
  user_region STRING,
  pv BIGINT,
  uv BIGINT,
  PRIMARY KEY (user_region) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = '<yourTopicName>',
  'properties.bootstrap.servers' = '...',
  'key.format' = 'avro',
  'value.format' = 'avro'
);

--Write the PV and UV data to the result table. 
INSERT INTO pageviews_per_region 
SELECT
  user_region,
  COUNT(*),
  COUNT(DISTINCT user_id)
FROM pageviews
GROUP BY user_region;