All Products
Search
Document Center

Realtime Compute for Apache Flink:Confluent Avro

Last Updated:Aug 29, 2024

This topic provides an example on how to use the Confluent Avro format and describes the parameters and data type mappings of Confluent Avro.

Background information

The Confluent Avro format allows you to read data records that are serialized by io.confluent.kafka.serializers.KafkaAvroSerializer and to write data records that are deserialized by io.confluent.kafka.serializers.KafkaAvroDeserializer.

  • When you read a data record in the Confluent Avro format, the Avro write schema is obtained from the configured Confluent Schema Registry based on the schema version ID that is encoded in the data record, and the read schema is inferred from the table schema.

  • When you write a data record in the Confluent Avro format, the Avro structure is inferred from the table schema and is used to retrieve the schema ID that is encoded in the data record. The schema ID is retrieved under a subject that is configured in the Confluent Schema Registry. The subject is specified by the avro-confluent.subject parameter.

Connectors that support the Confluent Avro format include Apache Kafka connector and Upsert Kafka connector.

Sample code

The following sample code provides examples on how to create a table in the Confluent Avro format by using the Apache Kafka and Upsert Kafka connectors.

  • Example 1: Create a table that uses the original UTF-8 string as the Kafka key and Avro records registered in Confluent Schema Registry as Kafka values by using the Apache Kafka connector.

CREATE TABLE user_created (
  -- Map the column to the original UTF-8 string that is used as the Kafka key. 
  the_kafka_key STRING,
  
  -- Map the columns to the Avro fields that are used as Kafka values. 
  id STRING,
  name STRING,
  email STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_events_example1',
  'properties.bootstrap.servers' = 'localhost:9092',

  -- Configure the Kafka key in the UTF-8 string format and use the column named the_kafka_key in the table. 
  'key.format' = 'raw',
  'key.fields' = 'the_kafka_key',

  'value.format' = 'avro-confluent',
  'value.avro-confluent.url' = 'http://localhost:8082',
  'value.fields-include' = 'EXCEPT_KEY'
);

The following DDL statement is used to write data to a Kafka table:

INSERT INTO user_created
SELECT
  -- Copy the user ID to the column that is mapped to the Kafka key. 
  id as the_kafka_key,

  -- Obtain all Kafka values. 
  id, name, email
FROM some_table
  • Example 2: Create a table that uses the Avro records registered in Confluent Schema Registry as the Kafka keys and Kafka values by using the Apache Kafka connector.

CREATE TABLE user_created (  
  -- Map the column to the Avro field id that is used as the Kafka key. 
  kafka_key_id STRING,
  
  -- Map the columns to the Avro fields that are used as Kafka values. 
  id STRING,
  name STRING, 
  email STRING 
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_events_example2',
  'properties.bootstrap.servers' = 'localhost:9092',

  -- Note: In most cases, schema upgrades are not backward or forward compatible in Kafka keys due to hash partitioning. 
  'key.format' = 'avro-confluent',
  'key.avro-confluent.url' = 'http://localhost:8082',
  'key.fields' = 'kafka_key_id',

  -- In this example, both the Kafka key and the Kafka value in the Avro format contain the id field. 
  -- Add a prefix to the column that is associated with the Kafka key field in the table to avoid conflicts. 
  'key.fields-prefix' = 'kafka_key_',

  'value.format' = 'avro-confluent',
  'value.avro-confluent.url' = 'http://localhost:8082',
  'value.fields-include' = 'EXCEPT_KEY',
   
  -- A subject has a default value in Flink 1.13 or later. The default value can be overwritten. 
  'key.avro-confluent.subject' = 'user_events_example2-key2',
  'value.avro-confluent.subject' = 'user_events_example2-value2'
);
  • Example 3: Create a table that uses Avro records registered in Confluent Schema Registry as Kafka values by using the Upsert Kafka connector.

CREATE TABLE user_created (  
  -- Map the column to the original UTF-8 string that is used as the Kafka key. 
  kafka_key_id STRING,
  
  -- Map the columns to the Avro fields that are used as Kafka values. 
  id STRING, 
  name STRING, 
  email STRING, 
  
  -- Specify a primary key to define the UPSERT behavior when the Upsert Kafka connector is used. 
  PRIMARY KEY (kafka_key_id) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'user_events_example3',
  'properties.bootstrap.servers' = 'localhost:9092',

  -- Use UTF-8 strings as Kafka keys. 
  -- In this example, the key.fields parameter is not specified because the value of this parameter is determined by the primary key of the table. 
  'key.format' = 'raw',
  
  -- In this example, both the Kafka key and the Kafka value in the Avro format contain the id field. 
  -- Add a prefix to the column that is associated with the Kafka key field in the table to avoid conflicts. 
  'key.fields-prefix' = 'kafka_key_',

  'value.format' = 'avro-confluent',
  'value.avro-confluent.url' = 'http://localhost:8082',
  'value.fields-include' = 'EXCEPT_KEY'
);

Parameters

Parameter

Required

Default value

Data type

Description

format

Yes

No default value

STRING

The format that you declare to use. If you want to use the Confluent Avro format, set this parameter to Confluent Avro.

avro-confluent.basic-auth.credentials-source

No

No default value

STRING

The source of basic authentication credentials for Confluent Schema Registry.

avro-confluent.basic-auth.user-info

No

No default value

STRING

The basic authentication user information of Confluent Schema Registry.

avro-confluent.bearer-auth.credentials-source

No

No default value

STRING

The source of bearer authentication credentials for Confluent Schema Registry.

avro-confluent.bearer-auth.token

No

No default value

STRING

The bearer authentication token for Confluent Schema Registry.

avro-confluent.properties

No

No default value

MAP

The property mapping, which is forwarded to Confluent Schema Registry. This parameter is suitable for options that are not officially exposed by using Flink configuration options.

Important

Flink configuration options have higher priorities than this parameter.

avro-confluent.ssl.keystore.location

No

No default value

STRING

The location of the SSL keystore.

avro-confluent.ssl.keystore.password

No

No default value

STRING

The password of the SSL keystore.

avro-confluent.ssl.truststore.location

No

No default value

STRING

The location of the SSL truststore.

avro-confluent.ssl.truststore.password

No

No default value

STRING

The password of the SSL truststore.

avro-confluent.subject

No

No default value

STRING

The Confluent Schema Registry subject. The schema that is used by the Confluent Avro format during serialization is registered under the Confluent Schema Registry subject. By default, the Apache Kafka and Upsert Kafka connectors use <Topname>-value or <topname>-key as the default subject name if the Confluent Avro format is used as the value or key format. If you use the file system connector for a result table, the avro-confluent.subject parameter is required.

avro-confluent.url

Yes

No default value

STRING

The URL of Confluent Schema Registry to obtain or register schemas.

Data type mappings

Data type mappings between Flink SQL data types and Confluent Avro data types are similar to data type mappings between Flink SQL data types and Avro data types. The following table describes the mappings between Flink SQL data types and Avro data types.

Flink SQL data type

Avro data type

CHAR, VARCHAR, and STRING

STRING

BOOLEAN

BOOLEAN

BINARY and VARBINARY

BYTES

DECIMAL

FIXED

Note

The value of this data type is a decimal number with precision.

TINYINT

INT

SMALLINT

INT

INT

INT

BIGINT

LONG

FLOAT

FLOAT

DOUBLE

DOUBLE

DATE

INT

Note

The value of this data type is a date.

TIME

INT

Note

The value of this data type is a point in time in milliseconds.

TIMESTAMP

LONG

Note

The value of this data type is a timestamp in milliseconds.

ARRAY

ARRAY

MAP

Note

The elements must be of the STRING, CHAR, or VARCHAR data type.

MAP

MULTISET

Note

The elements must be of the STRING, CHAR, or VARCHAR data type.

MAP

ROW

RECORD

Flink can read and write data of the Nullable data type. Data of the Nullable data type is mapped to Avro union(something, null). something is an Avro data type that is converted from a Flink data type.

Note

For more information about Avro data types, see Specification.