This topic provides the DDL syntax that is used to create an Upsert Kafka source table, describes the parameters in the WITH clause, and provides sample code.

What is Upsert Kafka?

Upsert Kafka is implemented based on the Upsert Kafka connector of the Apache Flink community. For more information, see Upsert Kafka SQL connector. Upsert Kafka produces a changelog stream and allows you to read data from and write data to Kafka topics in the upsert fashion. Each data record in the changelog stream represents an update or delete event. Each data record is processed in the following way:
  • If a Kafka topic contains a key that is the same as that in a data record, the value in the data record overwrites the value of the key. The data record is interpreted as UPDATE.
  • If a Kafka topic does not contain such a key, the value in the data record is inserted into the Kafka topic. The data record is interpreted as INSERT.
  • If the value of the key in a data record is null, the data record is interpreted as DELETE.

Prerequisites

Resources are created in the Message Queue for Apache Kafka console. For more information, see Step 3: Create resources.

Limits

Only Flink that uses Ververica Runtime (VVR) 3.0.1 or later supports the Upsert Kafka connector.

DDL syntax

In this example, the pageviews_per_region table is used.
CREATE TABLE pageviews_per_region (
  user_region STRING,
  pv BIGINT,
  uv BIGINT,
  PRIMARY KEY (user_region) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = '<yourTopicName>',
  'properties.bootstrap.servers' = '<yourServers>',
  'key.format' = 'avro',
  'value.format' = 'avro'
);
Note You must specify a primary key for Upsert Kafka.

Parameters in the WITH clause

Parameter Description Required Data type Remarks
connector The type of the source table. Yes String Set the value to upsert-kafka.
topic The topic of the source table. Yes String N/A.
properties.bootstrap.servers The IP addresses or endpoints and port numbers of Kafka brokers. Yes String Format: host:port,host:port,host:port. Separate multiple host:port pairs with commas (,).
key.format The format used to deserialize the Key field of Upsert Kafka messages. Yes String Valid values:
  • csv
  • json
  • avro
value.format The format used to deserialize the Value field of Upsert Kafka messages. Yes String Valid values:
  • csv
  • json
  • avro
value.fields-include Specifies the fields that appear in the Value field. Yes String Valid values:
  • ALL: indicates all fields in the schema, including the primary key field. This is the default value.
  • EXCEPT_KEY: All the fields of the table schema are included, except the primary key field.
key.fields-prefix Specifies a prefix for all fields of the primary key to prevent a name conflict between the Key and Value fields. No String The prefix is empty by default. If a prefix is defined, the table schema and key.fields use the prefix.

When you construct the data type for a Kafka Key field of a specific format, the prefix of the primary key name is deleted and the remaining part is used as its name.

Note The key.fields-prefix parameter takes effect only when you set value.fields-include to EXCEPT_KEY.
properties.* Specifies Kafka parameters. No String The suffix must match the configuration that is defined in Apache Kafka Documentation. Flink automatically removes the properties. prefix and passes the converted parameter names and values to the Kafka client. For example, you can set properties.allow.auto.create.topics to false to disable the feature of automatic topic creation.
Note You cannot use this parameter to specify the key.deserializer or value.deserializer parameter because Flink will rewrite the values of the two parameters.

Example

CREATE TABLE pageviews_per_region (
  user_region STRING,
  pv BIGINT,
  uv BIGINT,
  PRIMARY KEY (user_region) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'pageviews_per_region',
  'properties.bootstrap.servers' = '<yourServers>',
  'key.format' = 'avro',
  'value.format' = 'avro'
);

CREATE TABLE pageviews_per_region_db (
  user_region STRING,
  pv BIGINT,
  uv BIGINT,
  PRIMARY KEY (user_region) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = '<yourUrl>',
  'table-name' = 'users'
);

INSERT INTO pageviews_per_region_db
SELECT user_region, pv, uv FROM pageviews_per_region;