This topic provides the DDL syntax that is used to create a Message Queue for Apache Kafka source table, describes the parameters in the WITH clause, and provides metadata columns and examples.

What is a Message Queue for Apache Kafka source table?

Message Queue for Apache Kafka is a distributed, high-throughput, and scalable message queue service provided by Alibaba Cloud. This service is widely used in big data applications, such as log collection, monitoring data aggregation, streaming data processing, and both online and offline data analysis.

Prerequisites

  • Resources are created in the Message Queue for Apache Kafka console. For more information, see Step 3: Create resources.
  • A whitelist is configured for the Message Queue for Apache Kafka cluster. For more information, see Configure the whitelist.

Limits

  • Only Flink that uses Ververica Runtime (VVR) 2.0.0 or later supports Message Queue for Apache Kafka connectors.
  • A Message Queue for Apache Kafka connector can read only data of Kafka 0.10 or later.
  • A Message Queue for Apache Kafka connector can support only the consumer configuration items in Kafka 2.4. For more information, see Consumer Configs in the Kafka 2.4 documentation.
  • Only the Flink compute engine of vvr-4.0.12-flink-1.13 or later allows you to execute the CREATE TABLE AS statement to synchronize data from a Message Queue for Apache Kafka source table.
  • Flink can infer the data types of columns in a table only in the JSON format and change the schema of such a table. Flink cannot perform the preceding operations on tables in other formats.
  • If you execute the CREATE TABLE AS statement to synchronize data from a Message Queue for Apache Kafka source table, you can synchronize JSON changes only to Hudi and Hologres result tables.
  • Flink can infer the data types only of value columns in a Message Queue for Apache Kafka source table and change the schema only for the value columns in the table. To synchronize data of the key columns in a Message Queue for Apache Kafka source table, you must specify the columns in the DDL statement. For more information, see Example 3.

DDL syntax

The following sample code shows how to use DDL statements to create a Message Queue for Apache Kafka source table. In this example, the message format is CSV and the table contains five fields.
CREATE TABLE kafkaTable (
  `user_id` BIGINT,
  `item_id` BIGINT,
  `category_id` BIGINT,
  `behavior` STRING,
  `topic` STRING METADATA VIRTUAL,
  `partition` BIGINT METADATA VIRTUAL
) WITH (
  'connector' = 'kafka',
  'topic' = 'my_excellent_topic',
  'properties.bootstrap.servers' = 'mykafka:9092',
  'properties.group.id' = 'my_excellent_group'
  'format' = 'csv',
  'scan.startup.mode' = 'earliest-offset'
)
You can configure the field names and parameters in the WITH clause based on your business requirements.

Metadata columns

You can define metadata columns in the Message Queue for Apache Kafka source table to obtain the metadata of Message Queue for Apache Kafka messages. For example, if multiple topics are defined in the WITH clause and a metadata column is defined in the Message Queue for Apache Kafka source table, the topic from which Flink reads data is marked.
Key Data type Description
topic STRING NOT NULL METADATA VIRTUAL The name of the topic to which the Message Queue for Apache Kafka message belongs.
partition INT NOT NULL METADATA VIRTUAL The ID of the partition to which the Message Queue for Apache Kafka message belongs.
headers MAP<STRING, BYTES> NOT NULL METADATA VIRTUAL The headers of the Message Queue for Apache Kafka message.
leader-epoch INT NOT NULL METADATA VIRTUAL The leader epoch of the Message Queue for Apache Kafka message.
offset BIGINT NOT NULL METADATA VIRTUAL The offset of the Message Queue for Apache Kafka message.
timestamp TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL METADATA VIRTUAL The timestamp of the Message Queue for Apache Kafka message.
timestamp-type STRING NOT NULL METADATA VIRTUAL The timestamp type of the Message Queue for Apache Kafka message. Valid values:
  • NoTimestampType: indicates that no timestamp is defined in the message.
  • CreateTime: indicates the time when the message was generated.
  • LogAppendTime: indicates the time when the message was added to Kafka brokers.
Note
  • Metadata columns can be defined only in VVR 3.0.0 and later.
  • If you want to use a Message Queue for Apache Kafka result table and enable Flink to ignore read-only metadata columns, you must declare these metadata columns as VIRTUAL when you define the metadata columns in the source table.

Data source for the CREATE TABLE AS statement

The CREATE TABLE AS statement can be executed to synchronize data from a Message Queue for Apache Kafka source table that is in the JSON format. If specific fields do not exist in a predefined table schema during data synchronization, Flink attempts to automatically infer the data types of the columns. If the data types that are obtained after automatic type inference do not meet your business requirements, you can perform auxiliary type inference to declare the data types of the columns. The following section describes the type inference methods and the methods to process nested types.
Note For more information about the JSON format, see JSON Format.
By default, Flink displays only the first layer of data in the JSON text during type inference. Flink infers the SQL data types based on the JSON data types and values and the data type mappings. You can also declare the data types of specific columns in the DDL statement to meet your business requirements. This method is auxiliary type inference.
  • Data type mappings
    The following table lists the mappings between JSON and Flink SQL data types.
    JSON data type Flink SQL data type
    BOOLEAN BOOLEAN
    STRING DATE, TIMESTAMP, TIMESTAMP_LTZ, TIME, or STRING
    INT or LONG BIGINT
    BIGINT DECIMAL or STRING
    Note The precision of the DECIMAL data type in Flink is limited. If the value of an integer exceeds the maximum precision of the DECIMAL type, Flink considers the data type of the value as STRING to prevent the loss of precision.
    FLOAT, DOUBLE, or BIG DECIMAL DOUBLE
    ARRAY STRING
    OBJECT STRING
    Example
    • JSON text
      {
        "id": 101,
        "name": "VVP",
        "properties": {
          "owner": "Alibaba Cloud",
          "engine": "Flink"
        }
          "type": ["Big data"]
      }
    • Information about the table to which Flink writes data
      id name properties type
      101 "VVP"
      {
           "owner": "Alibaba Cloud",
           "engine": "Flink"
      }
      ["Big data"]
  • Auxiliary type inference
    If the preceding rules do not meet your business requirements, you can declare the data type of a specific column in the DDL statement that is used to create the source table. If you use this method, Flink preferentially parses the field that you want to use as the data type of the column that is declared in the DDL statement. In this example, Flink parses the price field as the DECIMAL type instead of converting the field to the DOUBLE type based on the data type mappings.
    CREATE TABLE evolvingKafkaSource (
      price DECIMAL(18, 2)
    ) WITH (
      'connector' = 'kafka',
      'properties.bootstrap.servers' = 'localhost:9092',
      'topic' = 'evolving_kafka_demo',
      'scan.startup.mode' = 'earliest-offset',
      'format' = 'json'
    );
    However, if the data type that you specify in the DDL statement is different from the actual data type, one of the following methods can be used to handle this issue:
    • If the declared data type of a column has a larger range than the actual data type, the field that you want to use is parsed based on the declared data type.

      For example, if the column is declared as the DOUBLE type and data in the column is of the BIGINT type, the data type is parsed as DOUBLE.

    • If the actual data type has a larger range than the declared data type or the actual data type is incompatible with the declared data type, you cannot execute the CREATE TABLE AS statement to synchronize data type changes. In this case, an error is returned. To resolve this issue, you must restart your job and declare a valid data type to parse data.
      The following figure shows the ranges of data types and their compatibility. Data type rangeIn the preceding figure, the ranges of the data types that are closest to the root node are the largest. If two data types belong to different branches, the two data types are incompatible.
      Note
      • You cannot perform auxiliary type inference on complex data types, such as ROW, ARRAY, MAP, and MULTISET.
      • By default, Flink parses complex data types as STRING.
In most cases, JSON text in Kafka topics has a nested structure. If you want to extract nested columns from JSON text, you can use one of the following methods:
  • Declare 'json.infer-schema.flatten-nested-columns.enable'='true' in the DDL statement that is used to create the source table to expand all elements in nested columns to the top level.
    In this way, all nested columns are expanded in sequence. To prevent column name conflicts, Flink uses the path that indexes the column as the name of the column after the column is expanded.
    Note No solution is provided to handle column name conflicts. If a column name conflict occurs, declare json.ignore-parse-errors as true in the DDL statement that is used to create the source table to ignore the data that has conflicts.
    Example
    • JSON text
      {
        "nested": {
          "inner": {
            "col": true
          }
        }
      }
    • Information about the table to which Flink writes data
      neseted.inner.col
      true
  • Add `rowkey` AS JSON_VALUE(`properties`, `$.rowkey`) to the CREATE TABLE AS syntax in the DDL statement to add a computed column and specify the column that you want to expand.

    For more information, see Example 4.

Parameters in the WITH clause

Parameter Description Required Data type Remarks
connector The type of the source table. Yes String Set the value to kafka.
topic The name of the topic. Yes String Separate multiple topic names with semicolons (;), such as topic- 1;topic-2.
Notice The topic parameter cannot be used together with the topic-pattern parameter.
topic-pattern The regular expression that is used to match topics. When a job is running, all topics whose names match the specified regular expression are subscribed by the consumer. No String
Notice
  • Only VVR 3.0.0 and later support this parameter.
  • The topic parameter cannot be used together with the topic-pattern parameter.
properties.bootstrap.servers The IP addresses or endpoints of Kafka brokers. Yes String Format: host:port,host:port,host:port. Separate multiple host:port pairs with commas (,).
properties.group.id The ID of a Kafka consumer group. Yes String N/A.
properties.* The Kafka configurations. No String The suffix must match the configuration defined in the Kafka documentation. Flink removes the properties. prefix and passes the transformed key and values to the Kafka client. For example, you can set properties.allow.auto.create.topics to false to disable automatic topic creation.

The values of the 'key.deserializer' and 'value.deserializer' parameters will be overridden by the Kafka configurations. Therefore, we recommend that you do not modify the settings of the two parameters by adding the properties. prefix.

format The format that the Message Queue for Apache Kafka connector uses to deserialize the value field in a Message Queue for Apache Kafka message. Yes String Valid values:
  • csv
  • json
  • avro
  • debezium-json
  • canal-json
  • maxwell-json
  • avro-confluent
  • raw
Note
  • Only VVR 3.0.0 and later support maxwell-json, avro-confluent, and raw formats.
  • The format parameter cannot be used together with the value.format parameter. If you use the two parameters at the same time, a conflict occurs.
value.format The format that the Message Queue for Apache Kafka connector uses to deserialize the value field in a Message Queue for Apache Kafka message. Yes String Valid values:
  • csv
  • json
  • avro
  • debezium-json
  • canal-json
  • maxwell-json
  • avro-confluent
  • raw
Note
  • Only VVR 3.0.0 and later support maxwell-json, avro-confluent, and raw formats.
  • The format parameter cannot be used together with the value.format parameter. If you use the two parameters at the same time, a conflict occurs.
key.format The format that the Message Queue for Apache Kafka connector uses to deserialize the key field in a Message Queue for Apache Kafka message. No String Valid values:
  • csv
  • json
  • avro
  • debezium-json
  • canal-json
  • maxwell-json
  • avro-confluent
  • raw
Note
  • Only VVR 3.0.0 and later support maxwell-json, avro-confluent, and raw formats.
  • If the key.format parameter is specified, you must also specify the key.fields parameter.
key.fields The fields that are parsed from the key field in a Message Queue for Apache Kafka message. No String Multiple field names are separated by semicolons (;), such as field 1;field2. By default, this parameter is not configured. Therefore, the key field is not parsed and the key data is discarded.
Note Only VVR 3.0.0 and later support this parameter.
key.fields-prefix The custom prefix for all keys in Message Queue for Apache Kafka messages. You can specify this parameter to prevent name conflicts with the value fields. No String The prefix is empty by default. If a prefix is defined, the prefix must be added to the name of the table schema and the name of the fields that are specified in the key.fields parameter.

When a key field is constructed, the prefix is removed and the name without a prefix is used.

Notice
  • Only VVR 3.0.0 and later support this parameter.
  • If you specify this parameter, you must set value.fields-include to EXCEPT_KEY.
value.fields-include Specifies whether to include the key field when the value field is parsed. No String Valid values:
  • ALL: All defined fields can store the data that is parsed from the value field. This is the default value.
  • EXCEPT_KEY: All defined fields except those specified by the key.fields parameter can store the data that is parsed from the value field.
Note Only VVR 3.0.0 and later support this parameter.
scan.startup.mode The start offset for Message Queue for Apache Kafka to read data. No String Valid values:
  • earliest-offset: Message Queue for Apache Kafka reads data from the earliest partition.
  • latest-offset: Message Queue for Apache Kafka reads data from the latest offset.
  • group-offsets: Message Queue for Apache Kafka reads data by group. This is the default value.
  • timestamp: Message Queue for Apache Kafka reads data from a specified time.

    You must specify the scan.startup.timestamp-millis parameter in the WITH clause.

  • specific-offsets: reads data from a specified partition of Message Queue for Apache Kafka at a specified offset.

    You must specify the scan.startup.specific-offsets parameter in the WITH clause.

scan.startup.specific-offsets The start offset of each partition when the scan.startup.mode parameter is set to specific-offsets. No String Example: partition:0,offset:42;partition:1,offset:300
scan.startup.timestamp-millis The timestamp of the start offset when the scan.startup.mode parameter is set to timestamp. No Long Unit: milliseconds.
value.fields-prefix A custom prefix for all values in Message Queue for Apache Kafka messages. You can specify this parameter to prevent name conflicts with the key fields or metadata fields. No String If a prefix is defined, the prefix must be added to the names of the fields in the table schema. When a key field is constructed, the prefix is removed and the name without a prefix is used.
Note Only Flink that uses VVR 4.0.12 or later supports this parameter.

The prefix is empty by default.

json.infer-schema.flatten-nested-columns.enable Specifies whether to recursively expand nested columns in a JSON text. No Boolean Valid values:
  • true: Nested columns are recursively expanded.

    Flink uses the path that indexes the value of the column that is expanded as the name of the column. For example, the column col in the JSON text {"nested": {"col": true}} is named nested.col after the column is expanded.

  • false: Nested types are parsed as the STRING type. This is the default value.
Note This parameter takes effect only when you execute the CREATE TABLE AS statement to synchronize data from a Message Queue for Apache Kafka source table.
json.infer-schema.primitive-as-string Specifies whether to infer all basic types as STRING. No Boolean Valid values:
  • true: All basic types are inferred as the STRING type.
  • false: Data types are inferred based on the data type mappings. This is the default value.
Note This parameter takes effect only when you execute the CREATE TABLE AS statement to synchronize data from a Message Queue for Apache Kafka source table.
For more information about how to configure the parameters of the Kafka consumer, see Consumer Configs in the Kafka 2.4 documentation. To directly configure the Kafka consumer used by the connector, add the properties prefix before the configuration parameters of the Kafka consumer and append the configurations to the parameters in the WITH clause. The following sample code shows that the Message Queue for Apache Kafka cluster requires Simple Authentication and Security Layer (SASL) authentication.
CREATE TABLE kafkaTable (
    ...
) WITH (
    ...
    'properties.security.protocol' = 'SASL_PLAINTEXT',
    'properties.sasl.mechanism' = 'PLAIN',
    'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="USERNAME" password="PASSWORD";'
);
Note Only per-job clusters support Kafka authentication.

Examples

  • Example 1: Read data from a Kafka topic and insert the data into another Kafka topic.
    Flink reads data from a topic named source in Message Queue for Apache Kafka and then writes the data to a topic named sink. The data is in the CSV format.
    CREATE TEMPORARY TABLE Kafka_source (
      id INT,
      name STRING,
      age INT
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'source',
      'properties.bootstrap.servers' = '<yourKafkaBrokers>',
      'properties.group.id' = '<yourKafkaConsumerGroupId>',
      'format' = 'csv'
    );
    
    CREATE TEMPORARY TABLE Kafka_sink (
      id INT,
      name STRING,
      age INT
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'sink',
      'properties.bootstrap.servers' = '<yourKafkaBrokers>',
      'properties.group.id' = '<yourKafkaConsumerGroupId>',
      'format' = 'csv'
    );
    
    INSERT INTO Kafka_sink SELECT id, name, age FROM Kafka_source;
  • Example 2: Synchronize the table schema and data.
    Flink synchronizes messages from a Kafka topic to Hologres in real time. In this case, you can use the offset and partition IDs of Message Queue for Apache Kafka messages as primary keys. This way, if a failover occurs, no duplicate messages exist in Hologres.
    CREATE TEMPORARY TABLE kafkaTable (
      `offset` INT NOT NULL METADATA,
      `part` BIGINT NOT NULL METADATA FROM 'partition',
      PRIMARY KEY (`part`, `offset`) NOT ENFORCED
    ) WITH (
      'connector' = 'kafka',
      'properties.bootstrap.servers' = '<yourKafkaBrokers>',
      'topic' = 'kafka_evolution_demo',
      'scan.startup.mode' = 'earliest-offset',
      'format' = 'json',
      'json.infer-schema.flatten-nested-columns.enable' = 'true'
        -- Optional. Expand all nested columns. 
    );
    
    CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka`
    WITH (
      'connector' = 'hologres'
    ) AS TABLE vvp.`default`.kafkaTable;
  • Example 3: Synchronize the table schema and data in the key and value columns of Message Queue for Apache Kafka messages.
    The key fields of Message Queue for Apache Kafka messages store relevant information. You can synchronize data in the key and value columns of Message Queue for Apache Kafka messages at the same time.
    CREATE TEMPORARY TABLE kafkaTable (
      `key_id` INT NOT NULL,
      `val_name` VARCHAR(200)
    ) WITH (
      'connector' = 'kafka',
      'properties.bootstrap.servers' = '<yourKafkaBrokers>',
      'topic' = 'kafka_evolution_demo',
      'scan.startup.mode' = 'earliest-offset',
      'key.format' = 'json',
      'value.format' = 'json',
      'key.fields' = 'key_id',
      'key.fields-prefix' = 'key_',
      'value.fields-prefix' = 'val_',
      'value.fields-include' = 'EXCEPT_KEY'
    );
    
    CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka`(
    WITH (
      'connector' = 'hologres'
    ) AS TABLE vvp.`default`.kafkaTable;
    Note The key columns in a Kafka message do not support table schema changes and type inference. Manual declaration is required.
  • Example 4: Synchronize the table schema and data and perform calculation.
    When you synchronize data from Kafka to Hologres, lightweight calculation is required.
    CREATE TEMPORARY TABLE kafkaTable (
      `distinct_id` INT NOT NULL,
      `properties` STRING,
      `timestamp` TIMESTAMP METADATA,
      `date` AS CAST(`timestamp` AS DATE)
    ) WITH (
      'connector' = 'kafka',
      'properties.bootstrap.servers' = '<yourKafkaBrokers>',
      'topic' = 'kafka_evolution_demo',
      'scan.startup.mode' = 'earliest-offset',
      'key.format' = 'json',
      'value.format' = 'json',
      'key.fields' = 'key_id',
      'key.fields-prefix' = 'key_'
    );
    
    CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka` WITH (
       'connector' = 'hologres'
    ) AS TABLE vvp.`default`.kafkaTable
    ADD COLUMN
      `order_id` AS COALESCE(JSON_VALUE(`properties`, '$.order_id'), 'default');
    -- Use COALESCE to handle null values. 

FAQ