This topic provides the DDL syntax that is used to create an Upsert Kafka result table, describes the parameters in the WITH clause, and provides sample code.
What is Upsert Kafka?
Upsert Kafka is implemented based on the Upsert Kafka of the Apache Flink community. For more information, see Upsert Kafka SQL Connector. Upsert Kafka can consume changelog streams, allow Flink to write INSERT and UPDATE_AFTER data to Kafka topics as normal Kafka messages, and use the UPSERT method to write DELETE data to Kafka topics. A record with a null value represents a DELETE event. Flink also partitions data based on the value of the primary key column, which ensures that messages with the same primary key are ordered by value. Therefore, UPDATE or DELETE data with the same primary key is written to the same partition.
Limits
- Only Flink that uses Ververica Runtime (VVR) 2.1.4 or later supports Upsert Kafka connectors.
- Upsert Kafka connectors allow you to write data to only Kafka 0.10 or later.
- Upsert Kafka connectors support only the producer configuration parameters of Kafka 2.4. For more information about the configuration parameters of the Kafka producer, see Producer Configs.
DDL syntax
CREATE TABLE upsert_kafka_sink (
user_region STRING,
pv BIGINT,
uv BIGINT,
PRIMARY KEY (user_region) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = '<yourTopicName>',
'properties.bootstrap.servers' = '...',
'key.format' = 'avro',
'value.format' = 'avro'
);
Parameters in the WITH clause
Parameter | Description | Required | Data type | Remarks |
---|---|---|---|---|
connector | The type of the result table. | Yes | STRING | Set the value to upsert-kafka .
|
topic | The topic of the result table. | Yes | STRING | N/A. |
properties.bootstrap.servers | The IP addresses or endpoints of Kafka brokers. | Yes | STRING | Format: host:port,host:port,host:port . The IP addresses or endpoints are separated by commas (,).
|
key.format | The format that is used to serialize the Key field of Upsert Kafka messages. | Yes | STRING | Valid values:
|
value.format | The format that is used to serialize the Value field of Upsert Kafka messages. | Yes | STRING | Valid values:
|
value.fields-include | Specifies the fields in the Value field. | No | String | Valid values:
|
properties.* | The specified Kafka parameters. | No | String | The suffix must match the configuration that is defined in Apache Kafka Documentation. Flink automatically removes the properties. prefix and passes the converted parameter names and values to the Kafka client. For
example, you can set properties.allow.auto.create.topics to false to disable the feature of automatic topic creation.
Note You cannot use this parameter to specify the key.deserializer or value.deserializer
parameter because Flink will overwrite the values of the two parameters.
|
properties.
prefix before Kafka producer configuration parameters and append the Kafka producer
configuration to the WITH clause. The following sample code shows that the Message
Queue for Apache Kafka cluster requires Simple Authentication and Security Layer (SASL)
authentication. CREATE TABLE kafkaTable (
...
) WITH (
...
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.sasl.mechanism' = 'PLAIN',
'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="USERNAME" password="PASSWORD";'
);
Sample code
--Create a source table for storing data about PVs and UVs.
CREATE TABLE pageviews (
user_id BIGINT,
page_id BIGINT,
viewtime TIMESTAMP,
user_region STRING,
WATERMARK FOR viewtime AS viewtime - INTERVAL '2' SECOND
) WITH (
'connector' = 'kafka',
'topic' = '<yourTopicName>',
'properties.bootstrap.servers' = '...',
'format' = 'json'
);
--Create an Upsert Kafka result table.
CREATE TABLE pageviews_per_region (
user_region STRING,
pv BIGINT,
uv BIGINT,
PRIMARY KEY (user_region) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = '<yourTopicName>',
'properties.bootstrap.servers' = '...',
'key.format' = 'avro',
'value.format' = 'avro'
);
--Write the PV and UV data to the result table.
INSERT INTO pageviews_per_region
SELECT
user_region,
COUNT(*),
COUNT(DISTINCT user_id)
FROM pageviews
GROUP BY user_region;