This topic provides an example on how to use the Confluent Avro format and describes the parameters and data type mappings of Confluent Avro.
Background information
The Confluent Avro format allows you to read data records that are serialized by io.confluent.kafka.serializers.KafkaAvroSerializer and to write data records that are deserialized by io.confluent.kafka.serializers.KafkaAvroDeserializer.
When you read a data record in the Confluent Avro format, the Avro write schema is obtained from the configured Confluent Schema Registry based on the schema version ID that is encoded in the data record, and the read schema is inferred from the table schema.
When you write a data record in the Confluent Avro format, the Avro structure is inferred from the table schema and is used to retrieve the schema ID that is encoded in the data record. The schema ID is retrieved under a subject that is configured in the Confluent Schema Registry. The subject is specified by the avro-confluent.subject parameter.
Connectors that support the Confluent Avro format include Apache Kafka connector and Upsert Kafka connector.
Sample code
The following sample code provides examples on how to create a table in the Confluent Avro format by using the Apache Kafka and Upsert Kafka connectors.
Example 1: Create a table that uses the original UTF-8 string as the Kafka key and Avro records registered in Confluent Schema Registry as Kafka values by using the Apache Kafka connector.
CREATE TABLE user_created (
-- Map the column to the original UTF-8 string that is used as the Kafka key.
the_kafka_key STRING,
-- Map the columns to the Avro fields that are used as Kafka values.
id STRING,
name STRING,
email STRING
) WITH (
'connector' = 'kafka',
'topic' = 'user_events_example1',
'properties.bootstrap.servers' = 'localhost:9092',
-- Configure the Kafka key in the UTF-8 string format and use the column named the_kafka_key in the table.
'key.format' = 'raw',
'key.fields' = 'the_kafka_key',
'value.format' = 'avro-confluent',
'value.avro-confluent.url' = 'http://localhost:8082',
'value.fields-include' = 'EXCEPT_KEY'
);The following DDL statement is used to write data to a Kafka table:
INSERT INTO user_created
SELECT
-- Copy the user ID to the column that is mapped to the Kafka key.
id as the_kafka_key,
-- Obtain all Kafka values.
id, name, email
FROM some_tableExample 2: Create a table that uses the Avro records registered in Confluent Schema Registry as the Kafka keys and Kafka values by using the Apache Kafka connector.
CREATE TABLE user_created (
-- Map the column to the Avro field id that is used as the Kafka key.
kafka_key_id STRING,
-- Map the columns to the Avro fields that are used as Kafka values.
id STRING,
name STRING,
email STRING
) WITH (
'connector' = 'kafka',
'topic' = 'user_events_example2',
'properties.bootstrap.servers' = 'localhost:9092',
-- Note: In most cases, schema upgrades are not backward or forward compatible in Kafka keys due to hash partitioning.
'key.format' = 'avro-confluent',
'key.avro-confluent.url' = 'http://localhost:8082',
'key.fields' = 'kafka_key_id',
-- In this example, both the Kafka key and the Kafka value in the Avro format contain the id field.
-- Add a prefix to the column that is associated with the Kafka key field in the table to avoid conflicts.
'key.fields-prefix' = 'kafka_key_',
'value.format' = 'avro-confluent',
'value.avro-confluent.url' = 'http://localhost:8082',
'value.fields-include' = 'EXCEPT_KEY',
-- A subject has a default value in Flink 1.13 or later. The default value can be overwritten.
'key.avro-confluent.subject' = 'user_events_example2-key2',
'value.avro-confluent.subject' = 'user_events_example2-value2'
);Example 3: Create a table that uses Avro records registered in Confluent Schema Registry as Kafka values by using the Upsert Kafka connector.
CREATE TABLE user_created (
-- Map the column to the original UTF-8 string that is used as the Kafka key.
kafka_key_id STRING,
-- Map the columns to the Avro fields that are used as Kafka values.
id STRING,
name STRING,
email STRING,
-- Specify a primary key to define the UPSERT behavior when the Upsert Kafka connector is used.
PRIMARY KEY (kafka_key_id) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'user_events_example3',
'properties.bootstrap.servers' = 'localhost:9092',
-- Use UTF-8 strings as Kafka keys.
-- In this example, the key.fields parameter is not specified because the value of this parameter is determined by the primary key of the table.
'key.format' = 'raw',
-- In this example, both the Kafka key and the Kafka value in the Avro format contain the id field.
-- Add a prefix to the column that is associated with the Kafka key field in the table to avoid conflicts.
'key.fields-prefix' = 'kafka_key_',
'value.format' = 'avro-confluent',
'value.avro-confluent.url' = 'http://localhost:8082',
'value.fields-include' = 'EXCEPT_KEY'
);Parameters
Parameter | Required | Default value | Data type | Description |
format | Yes | No default value | STRING | The format that you declare to use. If you want to use the Confluent Avro format, set this parameter to Confluent Avro. |
avro-confluent.basic-auth.credentials-source | No | No default value | STRING | The source of basic authentication credentials for Confluent Schema Registry. |
avro-confluent.basic-auth.user-info | No | No default value | STRING | The basic authentication user information of Confluent Schema Registry. |
avro-confluent.bearer-auth.credentials-source | No | No default value | STRING | The source of bearer authentication credentials for Confluent Schema Registry. |
avro-confluent.bearer-auth.token | No | No default value | STRING | The bearer authentication token for Confluent Schema Registry. |
avro-confluent.properties | No | No default value | MAP | The property mapping, which is forwarded to Confluent Schema Registry. This parameter is suitable for options that are not officially exposed by using Flink configuration options. Important Flink configuration options have higher priorities than this parameter. |
avro-confluent.ssl.keystore.location | No | No default value | STRING | The location of the SSL keystore. |
avro-confluent.ssl.keystore.password | No | No default value | STRING | The password of the SSL keystore. |
avro-confluent.ssl.truststore.location | No | No default value | STRING | The location of the SSL truststore. |
avro-confluent.ssl.truststore.password | No | No default value | STRING | The password of the SSL truststore. |
avro-confluent.subject | No | No default value | STRING | The Confluent Schema Registry subject. The schema that is used by the Confluent Avro format during serialization is registered under the Confluent Schema Registry subject. By default, the Apache Kafka and Upsert Kafka connectors use <Topname>-value or <topname>-key as the default subject name if the Confluent Avro format is used as the value or key format. If you use the file system connector for a result table, the avro-confluent.subject parameter is required. |
avro-confluent.url | Yes | No default value | STRING | The URL of Confluent Schema Registry to obtain or register schemas. |
Data type mappings
Data type mappings between Flink SQL data types and Confluent Avro data types are similar to data type mappings between Flink SQL data types and Avro data types. The following table describes the mappings between Flink SQL data types and Avro data types.
Flink SQL data type | Avro data type |
CHAR, VARCHAR, and STRING | STRING |
BOOLEAN | BOOLEAN |
BINARY and VARBINARY | BYTES |
DECIMAL | FIXED Note The value of this data type is a decimal number with precision. |
TINYINT | INT |
SMALLINT | INT |
INT | INT |
BIGINT | LONG |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DATE | INT Note The value of this data type is a date. |
TIME | INT Note The value of this data type is a point in time in milliseconds. |
TIMESTAMP | LONG Note The value of this data type is a timestamp in milliseconds. |
ARRAY | ARRAY |
MAP Note The elements must be of the STRING, CHAR, or VARCHAR data type. | MAP |
MULTISET Note The elements must be of the STRING, CHAR, or VARCHAR data type. | MAP |
ROW | RECORD |
Flink can read and write data of the Nullable data type. Data of the Nullable data type is mapped to Avro union(something, null). something is an Avro data type that is converted from a Flink data type.
For more information about Avro data types, see Specification.