This topic provides the DDL syntax that is used to create a Message Queue for Apache Kafka result table, describes the parameters in the WITH clause, and provides sample code.

What is Message Queue for Apache Kafka?

Message Queue for Apache Kafka is a distributed, high-throughput, and scalable message queue service provided by Alibaba Cloud. This service is widely used in big data applications, such as log collection, monitoring data aggregation, streaming data processing, and both online and offline data analysis.

Prerequisites

  • Resources are created in the Message Queue for Apache Kafka console. For more information, see Step 3: Create resources.
  • A whitelist is configured for a Message Queue for Apache Kafka instance. For more information, see Configure the whitelist.

Limits

  • Only Flink that uses Ververica Runtime (VVR) 2.0.0 or later supports Message Queue for Apache Kafka connectors.
  • Message Queue for Apache Kafka connectors allow you to write only result data to Kafka 0.10 or later.

DDL syntax

create table kafka_sink(  
  user_id BIGINT,
  item_id BIGINT,
  category_id BIGINT,
  behavior STRING,
  ts TIMESTAMP(3)        
) with (
  'connector' = 'kafka',
  'topic' = '<yourTopicName>',
  'properties.bootstrap.servers' = '<yourKafkaBrokers>',
  'format' = 'csv'
);

Parameters in the WITH clause

Parameter Description Required Data type Remarks
connector The type of the result table. Yes STRING Set the value to kafka.
topic The topic of the result table. Yes STRING N/A.
properties.bootstrap.servers The IP addresses or endpoints and port numbers of Kafka brokers. Yes STRING Format: host:port,host:port,host:port. Separate multiple host:port pairs with commas (,).
format The format that a Message Queue for Apache Kafka connector uses to deserialize messages from Message Queue for Apache Kafka. Yes STRING The following formats are supported:
  • csv
  • json
  • avro
sink.partitioner The mapping pattern between Flink and Message Queue for Apache Kafka partitions. No STRING Valid values:
  • fixed: Each Flink partition is mapped to only one partition of Message Queue for Apache Kafka.
  • round-robin: Data in Flink partitions is distributed to the partitions of Message Queue for Apache Kafka in turn.
  • Custom partition mapping pattern: If the fixed and round-robin patterns do not meet your requirements, you can create a subclass of FlinkKafkaPartitioner to customize a partition mapping pattern, such as org.mycompany.MyPartitioner.
To directly configure the Kafka producer used by the connector, add the properties prefix before Kafka producer configuration parameters and append the Kafka producer configuration to the WITH clause. The following sample code shows that the Message Queue for Apache Kafka cluster requires Simple Authentication and Security Layer (SASL) authentication.
CREATE TABLE kafkaTable (
    ...
) WITH (
    ...
    'properties.security.protocol' = 'SASL_PLAINTEXT',
    'properties.sasl.mechanism' = 'PLAIN',
    'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="USERNAME" password="PASSWORD";'
);
For more information about the parameters configured for Kafka producers, see Producer Configs.

Sample code on how to read data from a Kafka topic and insert the data into another Kafka topic

Flink reads data from a topic named source in Message Queue for Apache Kafka and then writes the data to a topic named sink. The data is in the CSV format.
CREATE TEMPORARY TABLE kafka_source (
    id INT,
    name STRING,
    age INT
) WITH (
    'connector' = 'kafka',
    'topic' = '<yourTopicName>',
    'properties.bootstrap.servers' = '<yourKafkaBrokers>',
    'properties.group.id' = '<yourPropertiesGroupid>',
    'format' = 'csv'
);

CREATE TEMPORARY TABLE kafka_sink (
    id INT,
    name STRING,
    age INT
) WITH (
    'connector' = 'kafka',
    'topic' = '<yourTopicName>',
    'properties.bootstrap.servers' = '<yourKafkaBrokers>',
    'format' = 'csv'
);

INSERT INTO kafka_sink SELECT id, name, age FROM kafka_source;