This topic provides the DDL syntax that is used to create a Message Queue for Apache Kafka source table, describes the parameters in the WITH clause, and provides metadata columns and examples.
What is a Message Queue for Apache Kafka source table?
Message Queue for Apache Kafka is a distributed, high-throughput, and scalable message queue service provided by Alibaba Cloud. This service is widely used in big data applications, such as log collection, monitoring data aggregation, streaming data processing, and both online and offline data analysis.
Prerequisites
- Resources are created in the Message Queue for Apache Kafka console. For more information, see Step 3: Create resources.
- A whitelist is configured for the Message Queue for Apache Kafka cluster. For more information, see Configure the whitelist.
Limits
- Only Flink that uses Ververica Runtime (VVR) 2.0.0 or later supports Message Queue for Apache Kafka connectors.
- A Message Queue for Apache Kafka connector can read only data of Kafka 0.10 or later.
- A Message Queue for Apache Kafka connector can support only the consumer configuration items in Kafka 2.4. For more information, see Consumer Configs in the Kafka 2.4 documentation.
- Only the Flink compute engine of vvr-4.0.12-flink-1.13 or later allows you to execute the CREATE TABLE AS statement to synchronize data from a Message Queue for Apache Kafka source table.
- Flink can infer the data types of columns in a table only in the JSON format and change the schema of such a table. Flink cannot perform the preceding operations on tables in other formats.
- If you execute the CREATE TABLE AS statement to synchronize data from a Message Queue for Apache Kafka source table, you can synchronize JSON changes only to Hudi and Hologres result tables.
- Flink can infer the data types only of value columns in a Message Queue for Apache Kafka source table and change the schema only for the value columns in the table. To synchronize data of the key columns in a Message Queue for Apache Kafka source table, you must specify the columns in the DDL statement. For more information, see Example 3.
DDL syntax
CREATE TABLE kafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`category_id` BIGINT,
`behavior` STRING,
`topic` STRING METADATA VIRTUAL,
`partition` BIGINT METADATA VIRTUAL
) WITH (
'connector' = 'kafka',
'topic' = 'my_excellent_topic',
'properties.bootstrap.servers' = 'mykafka:9092',
'properties.group.id' = 'my_excellent_group'
'format' = 'csv',
'scan.startup.mode' = 'earliest-offset'
)
You can configure the field names and parameters in the WITH clause based on your
business requirements. Metadata columns
Key | Data type | Description |
---|---|---|
topic | STRING NOT NULL METADATA VIRTUAL | The name of the topic to which the Message Queue for Apache Kafka message belongs. |
partition | INT NOT NULL METADATA VIRTUAL | The ID of the partition to which the Message Queue for Apache Kafka message belongs. |
headers | MAP<STRING, BYTES> NOT NULL METADATA VIRTUAL | The headers of the Message Queue for Apache Kafka message. |
leader-epoch | INT NOT NULL METADATA VIRTUAL | The leader epoch of the Message Queue for Apache Kafka message. |
offset | BIGINT NOT NULL METADATA VIRTUAL | The offset of the Message Queue for Apache Kafka message. |
timestamp | TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL METADATA VIRTUAL | The timestamp of the Message Queue for Apache Kafka message. |
timestamp-type | STRING NOT NULL METADATA VIRTUAL | The timestamp type of the Message Queue for Apache Kafka message. Valid values:
|
- Metadata columns can be defined only in VVR 3.0.0 and later.
- If you want to use a Message Queue for Apache Kafka result table and enable Flink to ignore read-only metadata columns, you must declare these metadata columns as VIRTUAL when you define the metadata columns in the source table.
Data source for the CREATE TABLE AS statement
- Data type mappings
The following table lists the mappings between JSON and Flink SQL data types.
JSON data type Flink SQL data type BOOLEAN BOOLEAN STRING DATE, TIMESTAMP, TIMESTAMP_LTZ, TIME, or STRING INT or LONG BIGINT BIGINT DECIMAL or STRING Note The precision of the DECIMAL data type in Flink is limited. If the value of an integer exceeds the maximum precision of the DECIMAL type, Flink considers the data type of the value as STRING to prevent the loss of precision.FLOAT, DOUBLE, or BIG DECIMAL DOUBLE ARRAY STRING OBJECT STRING Example- JSON text
{ "id": 101, "name": "VVP", "properties": { "owner": "Alibaba Cloud", "engine": "Flink" } "type": ["Big data"] }
- Information about the table to which Flink writes data
id name properties type 101 "VVP" { "owner": "Alibaba Cloud", "engine": "Flink" }
["Big data"]
- JSON text
- Auxiliary type inference
If the preceding rules do not meet your business requirements, you can declare the data type of a specific column in the DDL statement that is used to create the source table. If you use this method, Flink preferentially parses the field that you want to use as the data type of the column that is declared in the DDL statement. In this example, Flink parses the price field as the DECIMAL type instead of converting the field to the DOUBLE type based on the data type mappings.
CREATE TABLE evolvingKafkaSource ( price DECIMAL(18, 2) ) WITH ( 'connector' = 'kafka', 'properties.bootstrap.servers' = 'localhost:9092', 'topic' = 'evolving_kafka_demo', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json' );
However, if the data type that you specify in the DDL statement is different from the actual data type, one of the following methods can be used to handle this issue:- If the declared data type of a column has a larger range than the actual data type,
the field that you want to use is parsed based on the declared data type.
For example, if the column is declared as the DOUBLE type and data in the column is of the BIGINT type, the data type is parsed as DOUBLE.
- If the actual data type has a larger range than the declared data type or the actual
data type is incompatible with the declared data type, you cannot execute the CREATE
TABLE AS statement to synchronize data type changes. In this case, an error is returned.
To resolve this issue, you must restart your job and declare a valid data type to
parse data.
The following figure shows the ranges of data types and their compatibility.
In the preceding figure, the ranges of the data types that are closest to the root node are the largest. If two data types belong to different branches, the two data types are incompatible.
Note- You cannot perform auxiliary type inference on complex data types, such as ROW, ARRAY, MAP, and MULTISET.
- By default, Flink parses complex data types as STRING.
- If the declared data type of a column has a larger range than the actual data type,
the field that you want to use is parsed based on the declared data type.
- Declare 'json.infer-schema.flatten-nested-columns.enable'='true' in the DDL statement that is used to create the source table to expand all elements
in nested columns to the top level.
In this way, all nested columns are expanded in sequence. To prevent column name conflicts, Flink uses the path that indexes the column as the name of the column after the column is expanded.Note No solution is provided to handle column name conflicts. If a column name conflict occurs, declare json.ignore-parse-errors as true in the DDL statement that is used to create the source table to ignore the data that has conflicts.Example
- JSON text
{ "nested": { "inner": { "col": true } } }
- Information about the table to which Flink writes data
neseted.inner.col true
- JSON text
- Add `rowkey` AS JSON_VALUE(`properties`, `$.rowkey`) to the CREATE TABLE AS syntax in the DDL statement to add a computed column and specify
the column that you want to expand.
For more information, see Example 4.
Parameters in the WITH clause
Parameter | Description | Required | Data type | Remarks |
---|---|---|---|---|
connector | The type of the source table. | Yes | String | Set the value to kafka .
|
topic | The name of the topic. | Yes | String | Separate multiple topic names with semicolons (;), such as topic- 1;topic-2 .
Notice The topic parameter cannot be used together with the topic-pattern parameter.
|
topic-pattern | The regular expression that is used to match topics. When a job is running, all topics whose names match the specified regular expression are subscribed by the consumer. | No | String |
Notice
|
properties.bootstrap.servers | The IP addresses or endpoints of Kafka brokers. | Yes | String | Format: host:port,host:port,host:port . Separate multiple host:port pairs with commas (,).
|
properties.group.id | The ID of a Kafka consumer group. | Yes | String | N/A. |
properties.* | The Kafka configurations. | No | String | The suffix must match the configuration defined in the Kafka documentation. Flink
removes the properties. prefix and passes the transformed key and values to the Kafka client. For example,
you can set properties.allow.auto.create.topics to false to disable automatic topic creation.
The values of the 'key.deserializer' and 'value.deserializer' parameters will be overridden by the Kafka configurations. Therefore, we recommend that you do not modify the settings of the two parameters by adding the properties. prefix. |
format | The format that the Message Queue for Apache Kafka connector uses to deserialize the value field in a Message Queue for Apache Kafka message. | Yes | String | Valid values:
Note
|
value.format | The format that the Message Queue for Apache Kafka connector uses to deserialize the value field in a Message Queue for Apache Kafka message. | Yes | String | Valid values:
Note
|
key.format | The format that the Message Queue for Apache Kafka connector uses to deserialize the key field in a Message Queue for Apache Kafka message. | No | String | Valid values:
Note
|
key.fields | The fields that are parsed from the key field in a Message Queue for Apache Kafka message. | No | String | Multiple field names are separated by semicolons (;), such as field 1;field2 . By default, this parameter is not configured. Therefore, the key field is not parsed
and the key data is discarded.
Note Only VVR 3.0.0 and later support this parameter.
|
key.fields-prefix | The custom prefix for all keys in Message Queue for Apache Kafka messages. You can specify this parameter to prevent name conflicts with the value fields. | No | String | The prefix is empty by default. If a prefix is defined, the prefix must be added to
the name of the table schema and the name of the fields that are specified in the
key.fields parameter.
When a key field is constructed, the prefix is removed and the name without a prefix is used. Notice
|
value.fields-include | Specifies whether to include the key field when the value field is parsed. | No | String | Valid values:
Note Only VVR 3.0.0 and later support this parameter.
|
scan.startup.mode | The start offset for Message Queue for Apache Kafka to read data. | No | String | Valid values:
|
scan.startup.specific-offsets | The start offset of each partition when the scan.startup.mode parameter is set to specific-offsets. | No | String | Example: partition:0,offset:42;partition:1,offset:300 |
scan.startup.timestamp-millis | The timestamp of the start offset when the scan.startup.mode parameter is set to timestamp. | No | Long | Unit: milliseconds. |
value.fields-prefix | A custom prefix for all values in Message Queue for Apache Kafka messages. You can specify this parameter to prevent name conflicts with the key fields or metadata fields. | No | String | If a prefix is defined, the prefix must be added to the names of the fields in the
table schema. When a key field is constructed, the prefix is removed and the name
without a prefix is used.
Note Only Flink that uses VVR 4.0.12 or later supports this parameter.
The prefix is empty by default. |
json.infer-schema.flatten-nested-columns.enable | Specifies whether to recursively expand nested columns in a JSON text. | No | Boolean | Valid values:
Note This parameter takes effect only when you execute the CREATE TABLE AS statement to
synchronize data from a Message Queue for Apache Kafka source table.
|
json.infer-schema.primitive-as-string | Specifies whether to infer all basic types as STRING. | No | Boolean | Valid values:
Note This parameter takes effect only when you execute the CREATE TABLE AS statement to
synchronize data from a Message Queue for Apache Kafka source table.
|
properties
prefix before the configuration parameters of the Kafka consumer and append the configurations
to the parameters in the WITH clause. The following sample code shows that the Message
Queue for Apache Kafka cluster requires Simple Authentication and Security Layer (SASL)
authentication. CREATE TABLE kafkaTable (
...
) WITH (
...
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.sasl.mechanism' = 'PLAIN',
'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="USERNAME" password="PASSWORD";'
);
Examples
- Example 1: Read data from a Kafka topic and insert the data into another Kafka topic.
Flink reads data from a topic named source in Message Queue for Apache Kafka and then writes the data to a topic named sink. The data is in the CSV format.
CREATE TEMPORARY TABLE Kafka_source ( id INT, name STRING, age INT ) WITH ( 'connector' = 'kafka', 'topic' = 'source', 'properties.bootstrap.servers' = '<yourKafkaBrokers>', 'properties.group.id' = '<yourKafkaConsumerGroupId>', 'format' = 'csv' ); CREATE TEMPORARY TABLE Kafka_sink ( id INT, name STRING, age INT ) WITH ( 'connector' = 'kafka', 'topic' = 'sink', 'properties.bootstrap.servers' = '<yourKafkaBrokers>', 'properties.group.id' = '<yourKafkaConsumerGroupId>', 'format' = 'csv' ); INSERT INTO Kafka_sink SELECT id, name, age FROM Kafka_source;
- Example 2: Synchronize the table schema and data.
Flink synchronizes messages from a Kafka topic to Hologres in real time. In this case, you can use the offset and partition IDs of Message Queue for Apache Kafka messages as primary keys. This way, if a failover occurs, no duplicate messages exist in Hologres.
CREATE TEMPORARY TABLE kafkaTable ( `offset` INT NOT NULL METADATA, `part` BIGINT NOT NULL METADATA FROM 'partition', PRIMARY KEY (`part`, `offset`) NOT ENFORCED ) WITH ( 'connector' = 'kafka', 'properties.bootstrap.servers' = '<yourKafkaBrokers>', 'topic' = 'kafka_evolution_demo', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json', 'json.infer-schema.flatten-nested-columns.enable' = 'true' -- Optional. Expand all nested columns. ); CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka` WITH ( 'connector' = 'hologres' ) AS TABLE vvp.`default`.kafkaTable;
- Example 3: Synchronize the table schema and data in the key and value columns of Message
Queue for Apache Kafka messages.
The key fields of Message Queue for Apache Kafka messages store relevant information. You can synchronize data in the key and value columns of Message Queue for Apache Kafka messages at the same time.
CREATE TEMPORARY TABLE kafkaTable ( `key_id` INT NOT NULL, `val_name` VARCHAR(200) ) WITH ( 'connector' = 'kafka', 'properties.bootstrap.servers' = '<yourKafkaBrokers>', 'topic' = 'kafka_evolution_demo', 'scan.startup.mode' = 'earliest-offset', 'key.format' = 'json', 'value.format' = 'json', 'key.fields' = 'key_id', 'key.fields-prefix' = 'key_', 'value.fields-prefix' = 'val_', 'value.fields-include' = 'EXCEPT_KEY' ); CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka`( WITH ( 'connector' = 'hologres' ) AS TABLE vvp.`default`.kafkaTable;
Note The key columns in a Kafka message do not support table schema changes and type inference. Manual declaration is required. - Example 4: Synchronize the table schema and data and perform calculation.
When you synchronize data from Kafka to Hologres, lightweight calculation is required.
CREATE TEMPORARY TABLE kafkaTable ( `distinct_id` INT NOT NULL, `properties` STRING, `timestamp` TIMESTAMP METADATA, `date` AS CAST(`timestamp` AS DATE) ) WITH ( 'connector' = 'kafka', 'properties.bootstrap.servers' = '<yourKafkaBrokers>', 'topic' = 'kafka_evolution_demo', 'scan.startup.mode' = 'earliest-offset', 'key.format' = 'json', 'value.format' = 'json', 'key.fields' = 'key_id', 'key.fields-prefix' = 'key_' ); CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka` WITH ( 'connector' = 'hologres' ) AS TABLE vvp.`default`.kafkaTable ADD COLUMN `order_id` AS COALESCE(JSON_VALUE(`properties`, '$.order_id'), 'default'); -- Use COALESCE to handle null values.
FAQ
- How do I obtain JSON data by using fully managed Flink?
- Fully managed Flink is connected to Message Queue for Apache Kafka, but cannot read data from or write data to Message Queue for Apache Kafka. What do I do?
- What is the purpose of the commit offset mechanism in fully managed Flink?
- Why does the error message "timeout expired while fetching topic metadata" appear even if a network connection is established between fully managed Flink and Message Queue for Apache Kafka?
- After the data of a Message Queue for Apache Kafka source table is calculated by using event time-based window functions, no data output is returned. Why?