All Products
Search
Document Center

Realtime Compute for Apache Flink:Manage Kafka JSON catalogs

Last Updated:Jan 14, 2026

After you configure a Kafka JSON catalog, you can directly access JSON-formatted topics in a Kafka cluster without defining a schema when you develop jobs in Realtime Compute for Apache Flink. This topic describes how to create, view, and delete a Kafka JSON catalog.

Background information

A Kafka JSON catalog infers the schema of a topic by automatically parsing JSON-formatted messages. This lets you retrieve specific field information from messages without declaring a Kafka table schema in Flink SQL. Kafka JSON catalogs have the following features:

  • The table name in a Kafka JSON catalog corresponds to the Kafka topic name. You do not need to manually register the Kafka table with a Data Definition Language (DDL) statement. This improves development efficiency and accuracy.

  • Tables provided by a Kafka JSON catalog can be used directly as source tables in Flink SQL jobs.

  • You can use Kafka JSON catalogs with the CREATE TABLE AS (CTAS) statement to synchronize data with schema changes.

Limits

  • Kafka JSON catalogs support only topics that contain messages in JSON format.

  • Only compute engines that use VVR 6.0.2 or later support Kafka JSON catalogs.

    Note

    If you are using VVR 4.x, you must upgrade your job to VVR 6.0.2 or later before you can use Kafka JSON catalogs.

  • You cannot modify existing Kafka JSON catalogs using DDL statements.

  • You can only query data tables. You cannot create, modify, or delete databases or tables.

    Note

    In CREATE DATABASE AS (CDAS) or CREATE TABLE AS (CTAS) scenarios that use a Kafka JSON catalog, topics can be created automatically.

  • Kafka JSON catalogs cannot read from or write to Kafka clusters that have SSL or SASL authentication enabled.

  • Tables provided by Kafka JSON catalogs can be used directly as source tables in Flink SQL jobs. They cannot be used as sink tables or lookup dimension tables.

  • ApsaraMQ for Kafka currently does not allow you to delete groups using the same API operation as open source Apache Kafka. When you create a Kafka JSON catalog, you must configure the aliyun.kafka.instanceId, aliyun.kafka.accessKeyId, aliyun.kafka.accessKeySecret, aliyun.kafka.endpoint, and aliyun.kafka.regionId parameters to automatically delete consumer groups. For more information, see Comparison between ApsaraMQ for Kafka and open source Apache Kafka.

Precautions

A Kafka JSON catalog generates a table schema by parsing sample data. For topics with inconsistent data formats, the catalog returns the widest possible schema, retaining all columns by default. If the data format of a topic changes, the table schema inferred by the catalog can be inconsistent at different times. If a different schema is inferred before and after a job restart, job execution issues may occur.

For example, a Flink SQL job references a table in a Kafka JSON catalog. If the job restarts from a savepoint after running for a period of time, a new schema that is different from the one used in the previous run might be retrieved. The job's execution plan, however, still uses the version generated with the old schema. This can cause mismatches in downstream components, such as filter conditions or field values. To prevent this, you can create a Kafka table using the CREATE TEMPORARY TABLE statement in your Flink SQL job to enforce a fixed schema.

Create a Kafka JSON catalog

  1. In the text editor on the Data Exploration tab, enter the statement to configure a Kafka JSON catalog.

    • Self-managed Kafka cluster or EMR Kafka cluster

      CREATE CATALOG <YourCatalogName> WITH(
       'type'='kafka',
       'properties.bootstrap.servers'='<brokers>',
       'format'='json',
       'default-database'='<dbName>',
       'key.fields-prefix'='<keyPrefix>',
       'value.fields-prefix'='<valuePrefix>',
       'timestamp-format.standard'='<timestampFormat>',
       'infer-schema.flatten-nested-columns.enable'='<flattenNestedColumns>',
       'infer-schema.primitive-as-string'='<primitiveAsString>',
       'infer-schema.parse-key-error.field-name'='<parseKeyErrorFieldName>',
       'infer-schema.compacted-topic-as-upsert-table'='true',
       'max.fetch.records'='100'
      );
    • ApsaraMQ for Kafka

      CREATE CATALOG <YourCatalogName> WITH(
       'type'='kafka',
       'properties.bootstrap.servers'='<brokers>',
       'format'='json',
       'default-database'='<dbName>',
       'key.fields-prefix'='<keyPrefix>',
       'value.fields-prefix'='<valuePrefix>',
       'timestamp-format.standard'='<timestampFormat>',
       'infer-schema.flatten-nested-columns.enable'='<flattenNestedColumns>',
       'infer-schema.primitive-as-string'='<primitiveAsString>',
       'infer-schema.parse-key-error.field-name'='<parseKeyErrorFieldName>',
       'infer-schema.compacted-topic-as-upsert-table'='true',
       'max.fetch.records'='100',
       'aliyun.kafka.accessKeyId'='<aliyunAccessKeyId>',
       'aliyun.kafka.accessKeySecret'='<aliyunAccessKeySecret>',
       'aliyun.kafka.instanceId'='<aliyunKafkaInstanceId>',
       'aliyun.kafka.endpoint'='<aliyunKafkaEndpoint>',
       'aliyun.kafka.regionId'='<aliyunKafkaRegionId>'
      );

    Parameter

    Type

    Description

    Required

    Remarks

    YourCatalogName

    String

    The name for the Kafka JSON catalog.

    Yes

    Enter a custom name in English.

    Important

    Remove the angle brackets (<>) after you replace the parameter with your catalog name. Otherwise, a syntax check error occurs.

    type

    String

    The catalog type.

    Yes

    The value must be kafka.

    properties.bootstrap.servers

    String

    The Kafka broker address.

    Yes

    Format: host1:port1,host2:port2,host3:port3.

    Separate multiple addresses with commas (,).

    format

    String

    The Kafka message format.

    Yes

    Currently, only JSON is supported. Flink parses JSON-formatted Kafka messages to get the schema.

    default-database

    String

    The name of the Kafka cluster.

    No

    The default value is kafka. A catalog requires a three-part name to locate a table: catalog_name.db_name.table_name. This parameter specifies the default db_name. Because Kafka does not have the concept of a database, you can use any string to represent the Kafka cluster as the database.

    key.fields-prefix

    String

    A custom prefix added to the names of fields parsed from the message key. This avoids naming conflicts after the Kafka message key is parsed.

    No

    The default value is key_. For example, if your key field is named a, the system parses the field name as key_a.

    Note

    The value of the key.fields-prefix parameter cannot be a prefix of the value of the value.fields-prefix parameter. For example, if you set value.fields-prefix to test1_value_, you cannot set key.fields-prefix to test1_.

    value.fields-prefix

    String

    A custom prefix added to the names of fields parsed from the message body (value). This avoids naming conflicts after the Kafka message body is parsed.

    No

    The default value is value_. For example, if your value field is named b, the system parses the field name as value_b.

    Note

    The value of the value.fields-prefix parameter cannot be a prefix of the value of the key.fields-prefix parameter. For example, if you set key.fields-prefix to test2_value_, you cannot set value.fields-prefix to test2_.

    timestamp-format.standard

    String

    The format for parsing Timestamp type fields in JSON-formatted messages. The system first tries to parse using your configured format. If parsing fails, it automatically tries other formats.

    No

    Valid values:

    • SQL (default)

    • ISO-8601

    infer-schema.flatten-nested-columns.enable

    Boolean

    Specifies whether to recursively expand nested columns in the JSON message body (value) during parsing.

    No

    Valid values:

    • true: Recursively expand.

      For an expanded column, Flink uses the path that indexes the value as the name. For example, for the column col in {"nested": {"col": true}} , its expanded name is nested.col.

      Note

      If you set this parameter to true, use it with the CREATE TABLE AS (CTAS) statement. Other DML statements do not support automatic expansion of nested columns.

    • false (default): Treat nested types as String.

    infer-schema.primitive-as-string

    Boolean

    Specifies whether to infer all primitive data types as the String type when parsing the JSON message body (value).

    No

    Valid values:

    • true: Infer all primitive types as String.

    • false (default): Infer types based on basic rules.

    infer-schema.parse-key-error.field-name

    String

    When parsing the JSON message key, if the key is not empty and parsing fails, a field of the VARBINARY type is added to the table schema to represent the message key data. The column name is a concatenation of the key.fields-prefix prefix and the value of this parameter.

    No

    The default value is col. For example, if the message body parses to a field named value_name, and the message key is not empty but fails to parse, the returned schema contains two fields: key_col and value_name.

    infer-schema.compacted-topic-as-upsert-table

    Boolean

    Specifies whether to use the table as an Upsert Kafka table when the cleanup policy of the Kafka topic is compact and the message key is not empty.

    No

    The default value is true. Set this to true when using CTAS or CDAS syntax to synchronize data to ApsaraMQ for Kafka.

    Note

    Only compute engines of VVR 6.0.2 or later support this parameter.

    max.fetch.records

    Int

    The maximum number of messages to consume when parsing JSON-formatted messages.

    No

    The default value is 100.

    aliyun.kafka.accessKeyId

    String

    The AccessKey ID of your Alibaba Cloud account. For more information, see Create an AccessKey pair.

    No

    Configure this parameter when using CTAS or CDAS syntax to synchronize data to ApsaraMQ for Kafka.

    Note

    Only compute engines of VVR 6.0.2 or later support this parameter.

    aliyun.kafka.accessKeySecret

    String

    The AccessKey secret of your Alibaba Cloud account. For more information, see Create an AccessKey pair.

    No

    Configure this parameter when using CTAS or CDAS syntax to synchronize data to ApsaraMQ for Kafka.

    Note

    Only compute engines of VVR 6.0.2 or later support this parameter.

    aliyun.kafka.instanceId

    String

    The ID of the ApsaraMQ for Kafka instance. You can view the ID on the instance details page of the ApsaraMQ for Kafka console.

    No

    Configure this parameter when using CTAS or CDAS syntax to synchronize data to ApsaraMQ for Kafka.

    Note

    Only compute engines of VVR 6.0.2 or later support this parameter.

    aliyun.kafka.endpoint

    String

    The API endpoint of ApsaraMQ for Kafka. For more information, see Endpoints.

    No

    Configure this parameter when using CTAS or CDAS syntax to synchronize data to ApsaraMQ for Kafka.

    Note

    Only compute engines of VVR 6.0.2 or later support this parameter.

    aliyun.kafka.regionId

    String

    The region ID of the instance where the topic is located. For more information, see Endpoints.

    No

    Configure this parameter when using CTAS or CDAS syntax to synchronize data to ApsaraMQ for Kafka.

    Note

    Only compute engines of VVR 6.0.2 or later support this parameter.

  2. Select the statement to create the catalog, and then click Run next to the line numbers on the left.

    image.png

  3. In the Metadata area on the left, you can view the created catalog.

View a Kafka JSON catalog

  1. In the text editor on the Data Exploration tab, enter the following statement.

    DESCRIBE `${catalog_name}`.`${db_name}`.`${topic_name}`;

    Parameter

    Description

    ${catalog_name}

    The name of the Kafka JSON catalog.

    ${db_name}

    The name of the Kafka cluster.

    ${topic_name}

    The name of the Kafka topic.

  2. Select the statement to view the catalog, and then click Run next to the line numbers on the left.

    After the statement runs successfully, you can view the table details in the run result.Table information

Use a Kafka JSON catalog

  • Use the catalog as a source table to read data from a Kafka topic.

    INSERT INTO ${other_sink_table}
    SELECT...
    FROM `${kafka_catalog}`.`${db_name}`.`${topic_name}`/*+OPTIONS('scan.startup.mode'='earliest-offset')*/;
    Note

    To specify other WITH parameters when you use a table from a Kafka JSON catalog, you can use SQL Hints to add them. For example, the preceding SQL statement uses a SQL hint to specify that consumption starts from the earliest data. For more information about other parameters, see ApsaraMQ for Kafka source tables and ApsaraMQ for Kafka sink tables.

  • You can use the CREATE TABLE AS (CTAS) statement to synchronize data from a Kafka topic to a target table, using the Kafka topic as the source table.

    • Synchronize a single table in real time.

      CREATE TABLE IF NOT EXISTS `${target_table_name}`
      WITH(...)
      AS TABLE `${kafka_catalog}`.`${db_name}`.`${topic_name}`
      /*+OPTIONS('scan.startup.mode'='earliest-offset')*/;
    • Synchronize multiple tables in one job.

      BEGIN STATEMENT SET;
      
      CREATE TABLE IF NOT EXISTS `some_catalog`.`some_database`.`some_table0`
      AS TABLE `kafka-catalog`.`kafka`.`topic0`
      /*+ OPTIONS('scan.startup.mode'='earliest-offset') */;
      
      CREATE TABLE IF NOT EXISTS `some_catalog`.`some_database`.`some_table1`
      AS TABLE `kafka-catalog`.`kafka`.`topic1`
      /*+ OPTIONS('scan.startup.mode'='earliest-offset') */;
      
      CREATE TABLE IF NOT EXISTS `some_catalog`.`some_database`.`some_table2`
      AS TABLE `kafka-catalog`.`kafka`.`topic2`
      /*+ OPTIONS('scan.startup.mode'='earliest-offset') */;
      
      END;

      With a Kafka JSON catalog, you can synchronize multiple Kafka tables in the same task. However, the following conditions must be met:

      • The topic-pattern parameter is not configured for any of the Kafka tables.

      • The Kafka configuration for each table must be identical. This means all properties.* settings, including properties.bootstrap.servers and properties.group.id, must be the same.

      • The scan.startup.mode setting for each table must be identical. It can only be set to group-offsets, latest-offset, or earliest-offset.

      For example, in the following figure, the top two tables meet the conditions, while the bottom two tables violate them.Example

Note

For complete end-to-end examples of using a Kafka JSON catalog, see Quick Start for real-time log warehousing.

Delete a Kafka JSON catalog

Warning

Deleting a Kafka JSON catalog does not affect running jobs. However, jobs that use tables from that catalog will fail with a "table not found" error when they are published or restarted. Proceed with caution.

  1. In the text editor on the Data Exploration tab, enter the following statement.

    DROP CATALOG ${catalog_name};

    Replace ${catalog_name} with the name of the Kafka JSON catalog that you want to delete.

  2. Select the statement to delete the catalog, right-click, and select Run.

  3. In the Metadata area on the left, check whether the catalog is deleted.

Details of table information from a Kafka JSON catalog

To make it easier to use tables from a Kafka JSON catalog, the catalog adds default configuration parameters, metadata, and primary key information to the inferred table. The following section describes the details of the table information from a Kafka JSON catalog:

  • Schema inference for Kafka tables

    When a Kafka JSON catalog parses JSON-formatted messages to retrieve a topic's schema, the catalog attempts to consume up to max.fetch.records messages. It parses the schema of each data record according to the same basic rules as when Kafka is used as a CTAS data source. It then merges these schemas to form the final schema.

    Important
    • When a Kafka JSON catalog infers a schema, it creates a consumer group to consume the topic's data. The consumer group name uses a prefix to indicate that it was created by the catalog.

    • For ApsaraMQ for Kafka, use Kafka JSON catalogs with VVR 6.0.7 or later. In versions earlier than 6.0.7, consumer groups are not automatically deleted. This can cause alerts for stacked messages in consumer groups.

    The schema mainly includes the following parts:

    • Inferred Physical Columns

      A Kafka JSON catalog infers the physical columns of a message from the Kafka message key and body (value). The corresponding prefix is added to the column names.

      If the message key is not empty but cannot be parsed, a column of the VARBINARY type is created. The column name is a concatenation of the key.fields-prefix prefix and the value of the infer-schema.parse-key-error.field-name parameter.

      After pulling a group of Kafka messages, the catalog parses each message and merges the parsed physical columns to create the schema for the entire topic. The merge rules are as follows:

      • If the parsed physical columns contain fields that are not in the result schema, the Kafka JSON catalog automatically adds these fields to the result schema.

      • If columns with the same name appear, they are handled based on the following scenarios:

        • If the types are the same but the precision is different, the type with the higher precision is used.

        • If the types are different, the system finds the smallest parent node in the tree structure shown in the following figure and uses it as the type for the column with the same name. However, when Decimal and Float types are merged, they are converted to the Double type to preserve precision.Schema merge

      For example, for a Kafka topic that contains the three data records below, the Kafka JSON catalog produces the schema shown in the figure.Schema

    • Default Metadata Columns

      A Kafka JSON catalog adds three useful metadata columns by default: partition, offset, and timestamp. The details are shown in the following table.

      Metadata Name

      Column Name

      Type

      Description

      partition

      partition

      INT NOT NULL

      The partition value.

      offset

      offset

      BIGINT NOT NULL

      The offset.

      timestamp

      timestamp

      TIMESTAMP_LTZ(3) NOT NULL

      The message timestamp.

    • Default PRIMARY KEY constraint

      When a table from a Kafka JSON catalog is consumed as a source table, the metadata columns partition and offset are used as the primary key by default to ensure data uniqueness.

    Note

    If the table schema inferred by the Kafka JSON catalog does not meet your requirements, you can declare a temporary table using the CREATE TEMPORARY TABLE ... LIKE syntax to specify the desired table schema. For example, if JSON data contains a field named ts in the '2023-01-01 12:00:01' format, the Kafka JSON catalog automatically infers the ts field as a TIMESTAMP type. If you want to use the ts field as a STRING type, you can declare the table using the CREATE TEMPORARY TABLE ... LIKE syntax. As shown in the following example, because the value_ prefix is added to the message value fields in the default configuration, the field name is value_ts.

    CREATE TEMPORARY TABLE tempTable (
        value_name STRING,
        value_ts STRING
    ) LIKE `kafkaJsonCatalog`.`kafka`.`testTopic`;
  • Default table parameters

    Parameter

    Description

    Remarks

    connector

    The connector type.

    The value must be kafka or upsert-kafka.

    topic

    The corresponding topic name.

    The name of the table.

    properties.bootstrap.servers

    The Kafka broker address.

    Corresponds to the value of the properties.bootstrap.servers parameter in the catalog configuration.

    value.format

    The format used by the Flink Kafka connector to serialize or deserialize the Kafka message body (value).

    The value is fixed to JSON.

    value.fields-prefix

    Specifies a custom prefix for all Kafka message body (value) fields to avoid name conflicts with message key or metadata fields.

    Corresponds to the value of the value.fields-prefix parameter in the catalog configuration.

    value.json.infer-schema.flatten-nested-columns.enable

    Specifies whether to recursively expand nested columns in the JSON of the Kafka message body (value).

    Corresponds to the value of the infer-schema.flatten-nested-columns.enable parameter in the catalog configuration.

    value.json.infer-schema.primitive-as-string

    Specifies whether to infer all primitive data types as the String type for the Kafka message body (value).

    Corresponds to the value of the infer-schema.primitive-as-string parameter in the catalog configuration.

    value.fields-include

    Defines the policy for handling message key fields in the message body.

    The value must be EXCEPT_KEY. This indicates that the message body does not contain the message key fields.

    You must configure this parameter if the message key is not empty. Do not configure this parameter if the message key is empty.

    key.format

    The format used by the Flink Kafka connector to serialize or deserialize the Kafka message key.

    The value must be json or raw.

    You must configure this parameter if the message key is not empty. Do not configure this parameter if the message key is empty.

    If the message key is not empty but cannot be parsed, set this parameter to raw. If parsing is successful, set this parameter to json.

    key.fields-prefix

    Specifies a custom prefix for all Kafka message key fields to avoid name conflicts with message body format fields.

    Corresponds to the value of the key.fields-prefix parameter in the catalog configuration.

    You must configure this parameter if the message key is not empty. Do not configure this parameter if the message key is empty.

    key.fields

    The fields where data parsed from the Kafka message key is stored.

    The list of parsed key fields is automatically populated.

    You must configure this parameter if the message key is not empty and the table is not an Upsert Kafka table. Otherwise, do not configure this parameter.