All Products
Search
Document Center

Realtime Compute for Apache Flink:Manage Kafka JSON catalogs

Last Updated:Feb 06, 2024

After you create a Kafka JSON catalog, you can access JSON-formatted topics of a Kafka cluster in the console of fully managed Flink without the need to define a schema. This topic describes how to create, view, use, and drop a Kafka JSON catalog in the console of fully managed Flink.

Background information

A Kafka JSON catalog automatically parses JSON-formatted messages to infer the schema of a topic. Therefore, you can use a JSON catalog to obtain specific fields of the messages without the need to declare the schema of a Kafka table in Flink SQL. When you use a Kafka JSON catalog, take note of the following points:

  • The name of a table of a Kafka JSON catalog matches the name of a topic of the Kafka cluster. This way, you do not need to execute DDL statements to register the Kafka table to access the topic of the Kafka cluster. This improves the efficiency and accuracy of data development.

  • Tables of Kafka JSON catalogs can be used as source tables in Flink SQL deployments.

  • You can use Kafka JSON catalogs together with the CREATE TABLE AS statement to synchronize schema changes.

This topic describes the operations that you can perform to manage Kafka JSON catalogs:

Limits

  • Kafka JSON catalogs support only JSON-formatted topics.

  • Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 6.0.2 or later supports Kafka JSON catalogs.

    Note

    If your deployments use VVR 4.X, we recommend that you upgrade the VVR version of your deployments to VVR 6.0.2 or later before you use Kafka JSON catalogs.

  • You cannot modify the existing Kafka JSON catalogs by executing DDL statements.

  • You can only query data tables by using Kafka JSON catalogs. You are not allowed to create, modify, or delete databases and tables by using Kafka JSON catalogs.

    Note

    If you use Kafka JSON catalogs together with CREATE DATABASE AS statement or the CREATE TABLE AS statement, topics can be automatically created.

  • You cannot use Kafka JSON catalogs to read data from or write data to Kafka clusters for which SSL-based authentication or Simple Authentication and Security Layer (SASL) authentication is enabled.

  • Tables of Kafka JSON catalogs can be used as source tables in Flink SQL deployments but cannot be used as result tables or lookup tables that are used as dimension tables.

Create a Kafka JSON catalog

  1. In the code editor of the Scripts tab on the SQL Editor page, enter the following statement to create a Kafka JSON catalog:

    • Statement used to create a Kafka JSON catalog for a self-managed Kafka cluster or an E-MapReduce (EMR) Kafka cluster

      CREATE CATALOG <YourCatalogName> WITH(
       'type'='kafka',
       'properties.bootstrap.servers'='<brokers>',
       'format'='json',
       'default-database'='<dbName>',
       'key.fields-prefix'='<keyPrefix>',
       'value.fields-prefix'='<valuePrefix>',
       'timestamp-format.standard'='<timestampFormat>',
       'infer-schema.flatten-nested-columns.enable'='<flattenNestedColumns>',
       'infer-schema.primitive-as-string'='<primitiveAsString>',
       'infer-schema.parse-key-error.field-name'='<parseKeyErrorFieldName>',
       'infer-schema.compacted-topic-as-upsert-table'='true',
       'max.fetch.records'='100'
      );
    • Statement used to create a Kafka JSON catalog for an ApsaraMQ for Kafka instance

      CREATE CATALOG <YourCatalogName> WITH(
       'type'='kafka',
       'properties.bootstrap.servers'='<brokers>',
       'format'='json',
       'default-database'='<dbName>',
       'key.fields-prefix'='<keyPrefix>',
       'value.fields-prefix'='<valuePrefix>',
       'timestamp-format.standard'='<timestampFormat>',
       'infer-schema.flatten-nested-columns.enable'='<flattenNestedColumns>',
       'infer-schema.primitive-as-string'='<primitiveAsString>',
       'infer-schema.parse-key-error.field-name'='<parseKeyErrorFieldName>',
       'infer-schema.compacted-topic-as-upsert-table'='true',
       'max.fetch.records'='100',
       'aliyun.kafka.accessKeyId'='<aliyunAccessKeyId>',
       'aliyun.kafka.accessKeySecret'='<aliyunAccessKeySecret>',
       'aliyun.kafka.instanceId'='<aliyunKafkaInstanceId>',
       'aliyun.kafka.endpoint'='<aliyunKafkaEndpoint>',
       'aliyun.kafka.regionId'='<aliyunKafkaRegionId>'
      );

    Parameter

    Data type

    Description

    Required

    Remarks

    YourCatalogName

    STRING

    The name of the Kafka JSON catalog.

    Yes

    Enter a custom name.

    Important

    You must remove the angle brackets (<>) when you replace the value of the parameter with the name of your catalog. Otherwise, an error is returned during the syntax check.

    type

    STRING

    The type of the catalog.

    Yes

    Set the value to kafka.

    properties.bootstrap.servers

    STRING

    The IP addresses or endpoints of Kafka brokers.

    Yes

    Format: host1:port1,host2:port2,host3:port3.

    Separate multiple host:port pairs with commas (,).

    format

    STRING

    The format of Kafka messages.

    Yes

    Only the JSON format is supported. Fully managed Flink parses JSON-formatted Kafka messages to obtain the schema.

    default-database

    STRING

    The name of the Kafka cluster.

    No

    Default value: kafka. A catalog defines a table based on catalog_name.db_name.table_name. The default value of db_name is used in catalog_name.db_name.table_name. Kafka does not provide databases. You can use a string to change the value of db_name for the Kafka cluster.

    key.fields-prefix

    STRING

    The prefix that is added to the field that is parsed from the key field in a Kafka message. You can configure this parameter to prevent name conflicts after the key field in the Kafka message is parsed.

    No

    Default value: key_. If the name of the key field is a, the name of the key that is obtained after the key field in the Kafka message is parsed is key_a.

    Note

    The value of the key.fields-prefix parameter cannot be the same as the prefix in the value of the value.fields-prefix parameter. For example, if the value.fields-prefix parameter is set to test1_value_, you cannot set the key.fields-prefix parameter to test1_.

    value.fields-prefix

    STRING

    The prefix that is added to the field that is parsed from a value field in a Kafka message. You can configure this parameter to prevent name conflicts after value fields in the Kafka message are parsed.

    No

    Default value: value_. If the name of a value field is b, the name of the value that is obtained after the value field in the Kafka message is parsed is value_b.

    Note

    The value of the value.fields-prefix parameter cannot be the same as the prefix in the value of the key.fields-prefix parameter. For example, if the key.fields-prefix parameter is set to test2_value_, you cannot set the value.fields-prefix parameter to test2_.

    timestamp-format.standard

    STRING

    The format of a field of the TIMESTAMP type in a JSON-formatted Kafka message. Fully managed Flink parses the field in the format that you configured. If fully managed Flink fails to parse the field in the format that you configured, fully managed Flink attempts to parse the field in another format.

    No

    Valid values:

    • SQL (default value)

    • ISO-8601

    infer-schema.flatten-nested-columns.enable

    BOOLEAN

    Specifies whether to recursively expand nested columns in a JSON text when a value field in the JSON-formatted Kafka message is parsed.

    No

    Valid values:

    • true: Nested columns are recursively expanded.

      Fully managed Flink uses the path that indexes the value of the column that is expanded as the name of the column. For example, the column col in {"nested": {"col": true}} is named nested.col after the column is expanded.

      Note

      If you set this parameter to true, we recommend that you use this parameter together with the CREATE TABLE AS statement. Other DML statements cannot be used to automatically expand nested columns.

    • false: Nested types are parsed as the STRING type. This is the default value.

    infer-schema.primitive-as-string

    BOOLEAN

    Specifies whether to infer all basic types as the STRING type when value fields in the JSON-formatted Kafka message are parsed.

    No

    Valid values:

    • true: All basic types are inferred as the STRING type.

    • false: Data types are inferred based on the data type mappings. This is the default value.

    infer-schema.parse-key-error.field-name

    STRING

    The data of the key field. When the key field in the JSON-formatted Kafka message is parsed, if the key field is specified but fails to be parsed, a column whose name is the key.fields-prefix prefix and the value of this parameter is added to the schema of the table that matches the topic. This column is of the VARBINARY type and indicates the data of the key field.

    No

    Default value: col. For example, if the value field in the JSON-formatted Kafka message is parsed as value_name and the key field is specified but fails to be parsed, the returned schema of the table that matches the topic contains two fields: key_col and value_name.

    infer-schema.compacted-topic-as-upsert-table

    BOOLEAN

    Specifies whether to use a table as an Upsert Kafka table when the log cleanup policy of the Kafka topic is compact and the key field is specified.

    No

    Default value: true. You must set this parameter to true when you execute the CREATE TABLE AS statement or the CREATE DATABASE AS statement to synchronize data to ApsaraMQ for Kafka.

    Note

    Only Realtime Compute for Apache Flink that uses VVR 6.0.2 or later supports this parameter.

    max.fetch.records

    INT

    The maximum number of JSON-formatted messages that the system attempts to consume when the messages are parsed.

    No

    Default value: 100.

    aliyun.kafka.accessKeyId

    STRING

    The AccessKey ID of your Alibaba Cloud account. For more information about how to obtain the AccessKey ID, see Create an AccessKey pair.

    No

    You must configure this parameter when you execute the CREATE TABLE AS statement or the CREATE DATABASE AS statement to synchronize data to ApsaraMQ for Kafka.

    Note

    Only Realtime Compute for Apache Flink that uses VVR 6.0.2 or later supports this parameter.

    aliyun.kafka.accessKeySecret

    STRING

    The AccessKey secret of your Alibaba Cloud account. For more information about how to obtain the AccessKey secret, see Create an AccessKey pair.

    No

    You must configure this parameter when you execute the CREATE TABLE AS statement or the CREATE DATABASE AS statement to synchronize data to ApsaraMQ for Kafka.

    Note

    Only Realtime Compute for Apache Flink that uses VVR 6.0.2 or later supports this parameter.

    aliyun.kafka.instanceId

    STRING

    The ID of the ApsaraMQ for Kafka instance. You can view the ID on the Instance Details page of the ApsaraMQ for Kafka console.

    No

    You must configure this parameter when you execute the CREATE TABLE AS statement or the CREATE DATABASE AS statement to synchronize data to ApsaraMQ for Kafka.

    Note

    Only Realtime Compute for Apache Flink that uses VVR 6.0.2 or later supports this parameter.

    aliyun.kafka.endpoint

    STRING

    The endpoint of ApsaraMQ for Kafka. For more information, see Endpoints.

    No

    You must configure this parameter when you execute the CREATE TABLE AS statement or the CREATE DATABASE AS statement to synchronize data to ApsaraMQ for Kafka.

    Note

    Only Realtime Compute for Apache Flink that uses VVR 6.0.2 or later supports this parameter.

    aliyun.kafka.regionId

    STRING

    The region ID of the ApsaraMQ for Kafka instance to which the topic belongs. For more information, see Endpoints.

    No

    You must configure this parameter when you execute the CREATE TABLE AS statement or the CREATE DATABASE AS statement to synchronize data to ApsaraMQ for Kafka.

    Note

    Only Realtime Compute for Apache Flink that uses VVR 6.0.2 or later supports this parameter.

  2. Select the code that is used to create a catalog and click Run that appears on the left side of the code.

    image.png

  3. In the Catalogs pane on the left side of the Catalog List page, view the catalog that you create.

View a Kafka JSON catalog

  1. In the code editor of the Scripts tab on the SQL Editor page, enter the following statement:

    DESCRIBE `${catalog_name}`.`${db_name}`.`${topic_name}`;

    Parameter

    Description

    ${catalog_name}

    The name of the Kafka JSON catalog.

    ${db_name}

    The name of the Kafka cluster.

    ${topic_name}

    The name of the Kafka topic.

  2. Select the code that is used to view a catalog and click Run that appears on the left side of the code.

    After the statement is executed, you can view the information about the table that matches the topic in the result.表信息

Use a Kafka JSON catalog

  • If a table of the Kafka JSON catalog is used as a source table, you can read data from the Kafka topic that matches the table.

    INSERT INTO ${other_sink_table}
    SELECT...
    FROM `${kafka_catalog}`.`${db_name}`.`${topic_name}`/*+OPTIONS('scan.startup.mode'='earliest-offset')*/;
    Note

    If you need to specify other parameters in the WITH clause when you use a Kafka JSON catalog, we recommend that you use SQL hints to add other parameters. In the preceding SQL statement, SQL hints are used to specify that the consumption starts from the earliest data. For more information about other parameters, see Create an ApsaraMQ for Kafka source table and Create an ApsaraMQ for Kafka result table.

  • If a table of the Kafka JSON catalog is used as an ApsaraMQ for Kafka source table, you can synchronize data from the Kafka topic that matches the table to the destination table by using the CREATE TABLE AS statement.

    • Synchronize data from a single topic in real time.

      CREATE TABLE IF NOT EXISTS `${target_table_name}`
      WITH(...)
      AS TABLE `${kafka_catalog}`.`${db_name}`.`${topic_name}`
      /*+OPTIONS('scan.startup.mode'='earliest-offset')*/;
    • Synchronize data from multiple topics in a deployment.

      BEGIN STATEMENT SET;
      
      CREATE TABLE IF NOT EXISTS `some_catalog`.`some_database`.`some_table0`
      AS TABLE `kafka-catalog`.`kafka`.`topic0`
      /*+ OPTIONS('scan.startup.mode'='earliest-offset') */;
      
      CREATE TABLE IF NOT EXISTS `some_catalog`.`some_database`.`some_table1`
      AS TABLE `kafka-catalog`.`kafka`.`topic1`
      /*+ OPTIONS('scan.startup.mode'='earliest-offset') */;
      
      CREATE TABLE IF NOT EXISTS `some_catalog`.`some_database`.`some_table2`
      AS TABLE `kafka-catalog`.`kafka`.`topic2`
      /*+ OPTIONS('scan.startup.mode'='earliest-offset') */;
      
      END;

      You can use the CREATE TABLE AS statement together with Kafka JSON catalogs to synchronize data from multiple Kafka topics in a deployment. To synchronize data from multiple Kafka topics in a deployment, make sure that the following conditions are met:

      • topic-pattern is not configured for all tables that match the topics.

      • The values of Kafka parameters in each table are the same. The values of the parameters whose prefix is properties. are the same. The parameters include properties.bootstrap.servers and properties.group.id.

      • The values of the scan.startup.mode parameter are the same for all the tables. The scan.startup.mode parameter can be set only to group-offsets, latest-offset, or earliest-offset.

      The following figure shows an example. In the following figure, the upper two tables meet the preceding conditions and the lower two tables do not meet the conditions.示例

Note

For more information about how to synchronize data from a source table in a Kafka JSON catalog to a destination table in a destination catalog, see Ingest log data into data warehouses in real time.

Drop a Kafka JSON catalog

Warning

After you drop a Kafka JSON catalog, the deployments that are running are not affected. However, the deployments that use a table of the catalog can no longer find the table when the deployments are published or restarted. Proceed with caution when you drop a Kafka JSON catalog.

  1. In the code editor of the Scripts tab on the SQL Editor page, enter the following statement:

    DROP CATALOG ${catalog_name};

    catalog_name specifies the name of the Kafka JSON catalog that you want to drop.

  2. Right-click the statement that is used to drop the catalog and choose Run from the shortcut menu.

  3. View the Catalogs pane on the left side of the Catalog List page to check whether the catalog is dropped.

Schema inference description

To allow you to easily use the table that is obtained after you configure a Kafka JSON catalog, the system automatically adds default configuration parameters, metadata columns, and a primary key to the table. This section describes the information about the table that is obtained after you configure a Kafka JSON catalog.

  • Table schema

    When JSON-formatted Kafka messages are parsed to obtain the topic schema, the system attempts to consume messages that are less than or equal to the value of the max.fetch.records parameter. The system parses the schema of each data record and merges the schemas as the topic schema. The system parses the messages based on the data type mappings that are used when you use the CREATE TABLE AS statement to synchronize data of Kafka tables.

    Important
    • When a Kafka JSON catalog is used to infer the topic schema, a consumer group is created to consume the data of the topic. If the name of the consumer group includes a prefix, the consumer group is created by using the catalog.

    • If you want to obtain data from an ApsaraMQ for Kafka table, we recommend that you use a Kafka JSON catalog of Realtime Compute for Apache Flink that uses VVR 6.0.7 or later. For Realtime Compute for Apache Flink that uses VVR of a version earlier than 6.0.7, consumer groups are not automatically deleted. As a result, you may receive an alert notification about message accumulation in a consumer group.

    A topic schema consists of the following parts:

    • Physical columns

      By default, physical columns are parsed based on the key and value fields of a Kafka message. Prefixes are added to the obtained column names.

      If the key field is specified but fails to be parsed, a column whose name is the key.fields-prefix prefix and the value of the infer-schema.parse-key-error.field-name parameter is returned. The column type is VARBINARY.

      After the Kafka JSON catalog obtains a group of Kafka messages, the Kafka JSON catalog parses the Kafka messages in sequence and merges the physical columns that are obtained after parsing as the schema of the topic based on the following rules: This function merges JSON documents based on the following rules:

      • If specific physical columns that are obtained after parsing are not in the topic schema, the Kafka JSON catalog automatically adds the columns to the topic schema.

      • If specific physical columns that are obtained after parsing are named the same as specific columns in the topic schema, perform operations based on your business scenario:

        • If the columns are of the same data type but different precision, the Kafka JSON catalog merges the columns of the larger precision.

        • If the columns are of different data types, the Kafka JSON catalog uses the smallest parent node in the tree structure that is shown in the following figure as the type of the columns that have the same name. If columns of the DECIMAL and FLOAT types are merged, the columns are merged into the DOUBLE type to retain the precision.Schema合并

      For example, if a Kafka topic contains three data records, the schema shown in the following figure is returned.Schema

    • Metadata columns

      By default, the metadata columns named partition, offset, and timestamp are added. The following table describes the metadata columns.

      Metadata name

      Column name

      Type

      Description

      partition

      partition

      INT NOT NULL

      The value in a partition key column.

      offset

      offset

      BIGINT NOT NULL

      The offset.

      timestamp

      timestamp

      TIMESTAMP_LTZ(3) NOT NULL

      The timestamp of the message.

    • Rules for the default primary key that is added

      If a table that is obtained after you configure a Kafka JSON catalog is consumed as the source table, the metadata columns partition and offset are used as the primary key. This ensures that data is not duplicated.

    Note

    If the table schema that is inferred from the Kafka JSON catalog is not as expected, you can use the CREATE TEMPORARY TABLE ... LIKE syntax to declare a temporary table to specify the desired table schema. For example, JSON data contains the ts field in the '2023-01-01 12:00:01' format. The Kafka JSON catalog automatically infers the ts field as the TIMESTAMP data type. If you want the ts field to be used as the STRING data type, you can use the CREATE TEMPORARY TABLE... LIKE syntax to declare the table. In the following sample code, the value_ts field is used because the value_ prefix is added to the value field in the default configuration.

    CREATE TEMPORARY TABLE tempTable (
        value_name STRING,
        value_ts STRING
    ) LIKE `kafkaJsonCatalog`.`kafka`.`testTopic`;
  • Table parameters added by default

    Parameter

    Description

    Remarks

    connector

    The type of the connector.

    Set the value to kafka or upsert-kafka.

    topic

    The name of the topic.

    Set the value to the name of the table that you declare in the Kafka JSON catalog.

    properties.bootstrap.servers

    The IP addresses or endpoints of Kafka brokers.

    Set the value to the same as the value of the properties.bootstrap.servers parameter of the Kafka JSON catalog.

    value.format

    The format that the Flink Kafka connector uses to serialize or deserialize the value fields in a Kafka message.

    Set the value to json.

    value.fields-prefix

    A custom prefix for all value fields in Kafka messages. You can configure this parameter to prevent name conflicts with the key fields or metadata fields.

    Set the value to the same as the value of the value.fields-prefix parameter of the Kafka JSON catalog.

    value.json.infer-schema.flatten-nested-columns.enable

    Specifies whether to recursively expand nested columns in a JSON text when the value fields in the JSON-formatted Kafka message are parsed.

    Set the value to the same as the value of the infer-schema.flatten-nested-columns.enable parameter of the Kafka JSON catalog.

    value.json.infer-schema.primitive-as-string

    Specifies whether to infer all basic types as the STRING type when the value fields in the JSON-formatted Kafka message are parsed.

    Set the value to the same as the value of the infer-schema.primitive-as-string parameter of the Kafka JSON catalog.

    value.fields-include

    The policy that is used to process the key field when value fields are parsed.

    Set the value to EXCEPT_KEY. If this parameter is set to EXCEPT_KEY, the key field is excluded when value fields are parsed.

    If the key field is specified, you must configure this parameter.

    key.format

    The format that the Flink Kafka connector uses to serialize or deserialize the key field in a Kafka message.

    Set the value to json or raw.

    If the key field is specified, you must configure this parameter.

    If the key field is specified but fails to be parsed, set the value of this parameter to raw. If the key field is specified and is parsed, set the value of this parameter to json.

    key.fields-prefix

    A custom prefix for all key fields in Kafka messages. You can configure this parameter to prevent name conflicts with the value fields.

    Set the value to the same as the value of the key.fields-prefix parameter of the Kafka JSON catalog.

    If the key field is specified, you must configure this parameter.

    key.fields

    The fields that are parsed from the key field in a Kafka message.

    The system automatically enters the key fields in the table.

    If the key field is specified and the table is not an Upsert Kafka table, you must configure this parameter.