This topic describes how to use the Kafka connector.
Background information
Apache Kafka is an open-source distributed messaging system widely used in big data fields for high-performance data processing, streaming analytics, and data integration. The Kafka connector, based on the Apache Kafka client, offers high-performance data throughput, supports read and write operations in various formats, and provides exactly-once semantics for Realtime Compute for Apache Flink.
Category | Details |
Supported type | Source table and sink table, data ingestion sink |
Running mode | Stream mode |
Data format |
Note
|
Monitoring metrics |
Note For more information about the metrics, see Monitoring metrics description. |
API type | SQL, DataStream API, and data ingestion YAML API |
Support for updating or deleting data in sink tables | Data in sink tables cannot be updated or deleted. Data can only be inserted into sink tables. Note If you need to update or delete data in sink tables, see Upsert Kafka. |
Prerequisites
You can connect to a cluster using one of the following methods:
-
Connect to an ApsaraMQ for Kafka cluster
-
An ApsaraMQ for Kafka cluster has been created. For more information, see Create resources.
-
The Flink workspace and the Kafka cluster are located within the same VPC, and ApsaraMQ for Kafka has included Flink in its whitelist. For more information, see how to configure a whitelist.
ImportantLimits on writing data into ApsaraMQ for Kafka:
-
ApsaraMQ for Kafka does not support the Zstandard data compression algorithm for data writing.
-
ApsaraMQ for Kafka does not support idempotent writes or transaction-based write operations. As a result, you cannot utilize the exactly-once semantics offered by Kafka sink tables. When employing Realtime Compute for Apache Flink VVR 8.0.0 or a later version, you must include the
properties.enable.idempotence=false
configuration item in the sink table to disable the idempotent write feature. For more information about the storage engine comparison and limitations of ApsaraMQ for Kafka, see Storage engine comparison.
-
-
Connect to a self-managed Apache Kafka cluster
-
The version of the self-managed Apache Kafka cluster is 0.11 or later.
-
A network connection is established between Flink and the self-managed Apache Kafka cluster. For more information on how to connect to a self-managed cluster over the Internet, see How to access the Internet?
-
Only client configuration items of Apache Kafka 2.8 are supported. For more information, see the Apache Kafka consumer and producer configuration documentation.
-
Limits
Limits on using CREATE TABLE AS (CTAS)
-
Only Realtime Compute for Apache Flink VVR 4.0.12 or later supports using Kafka as a data source for the CREATE TABLE AS statement.
-
Only the JSON format is supported for type inference and schema changes. Other data formats are not supported.
-
Only type inference and schema changes for value fields in Kafka are supported. If you need to synchronize key fields in Kafka, you must specify them in the DDL statement. For more information, see Example 3.
Limits on YAML deployments for data ingestion
-
Only Realtime Compute for Apache Flink VVR 8.0.10 or later supports using Kafka as a data source for YAML deployments for data ingestion.
-
Only Debezium JSON and Canal JSON formats are supported. Other data formats are not supported.
-
For a data source, only Realtime Compute for Apache Flink VVR 8.0.11 or later supports distributing data in the same table across multiple partitions.
Considerations
In Realtime Compute for Apache Flink VVR 8.0.11 or earlier, due to design defects in the Flink and Kafka communities, transactional writes are not recommended. When the Kafka connector is configured with 'sink.delivery-guarantee' = 'exactly-once'
, transactional writes are used. The following issues exist in transactional writes:
-
The current sink implementation of the Kafka connector generates a transaction ID for each checkpoint of a Flink job to commit the transaction of data writes. If the checkpoint interval of the Flink job is set to a small value, the Coordinator component of the downstream Kafka cluster may run out of memory due to too many transaction IDs, which affects the stability of the entire downstream Kafka cluster.
-
The current sink implementation of the Kafka connector creates a producer instance for each transaction. If many Kafka write transactions are committed at the same time, the TaskManager component of the Flink job may run out of memory due to too many transaction ID instances, which affects the stability of the current Flink job.
-
If multiple Flink jobs use the same transaction ID prefix (sink.transactional-id-prefix), the transaction IDs generated by different Flink jobs based on the transaction ID prefix and checkpoint ID may affect each other. If an exception occurs in a Flink job, the Log Start Offset (LSO) of partitions in the Kafka cluster may be blocked, which affects data consumption in each partition of the Kafka cluster.
If you need exactly-once semantics, you can write data to a primary key table using Upsert Kafka to ensure idempotence with primary keys.
Troubleshoot network connectivity
If your Flink job fails to start and the error Timed out waiting for a node assignment
occurs, the issue is usually caused by network connectivity issues between Flink and Kafka.
The process of establishing a connection between a Kafka client and a Kafka broker is as follows:
-
The client initiates a connection with a broker using the IP address and port number specified by the properties.bootstrap.servers parameter. Then, the client returns the metadata of each broker, including the endpoint, based on the configuration.
-
The client uses the acquired endpoint to connect to a broker to produce or consume data.
If the Kafka broker is not properly configured, the endpoint that the client receives in the first step is incorrect. Even if the IP address and port number specified by the properties.bootstrap.servers parameter can be connected, data cannot be read or written as expected. This issue typically occurs when a forwarding mechanism, such as a proxy, port forwarding, or a private line, is used to establish the connection between Flink and Kafka.
You can perform the following steps to check whether the configuration of your Kafka cluster is valid.
-
Use the ZooKeeper command line tool (zkCli.sh or zookeeper-shell.sh) to log on to the ZooKeeper service used by your Kafka cluster.
-
Run a command based on the information about your Kafka cluster to obtain the metadata of your Kafka broker. You can usually run the
get /brokers/ids/0
command to obtain the metadata of your Kafka broker. The endpoint of the Kafka broker is in the endpoints field. The endpoint is the endpoint that the server returns to the client during the connection process. The following figure shows the information. -
Use commands such as ping or telnet to test the connectivity between Flink and the endpoint displayed in the endpoints field. If the endpoint cannot be connected, contact the O&M engineers of Kafka to configure the listeners and advertised.listeners properties of the Kafka broker for Flink.
For more information about the connection between a Kafka client and a Kafka broker, see Troubleshoot Connectivity.
SQL
The Kafka connector can be used in SQL drafts to read data from and write data into Kafka topics.
Syntax structure
CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv'
)
Metadata columns
You can define metadata columns in a source table or a sink table to obtain the metadata of Kafka messages. For example, if multiple topics are defined in the WITH clause for a Kafka source table and a metadata column is defined in the source table, the topic from which Flink reads data is marked. The following sample code provides an example of how to use metadata columns.
CREATE TABLE kafka_source (
-- Read the topic to which the message belongs as the value of the record_topic field
`record_topic` STRING NOT NULL METADATA FROM 'topic' VIRTUAL,
-- Read the timestamp in ConsumerRecord as the value of the ts field
`ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL,
-- Read the offset of the message as the value of the record_offset field
`record_offset` BIGINT NOT NULL METADATA FROM 'offset' VIRTUAL,
...
) WITH (
'connector' = 'kafka',
...
);
CREATE TABLE kafka_sink (
-- Write the timestamp in the ts field as the timestamp of ProducerRecord to Kafka
`ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL,
...
) WITH (
'connector' = 'kafka',
...
);
The following table describes the metadata columns supported by Kafka source tables and sink tables.
Key | Data type | Description | Source table or sink table |
topic | STRING NOT NULL METADATA VIRTUAL | The name of the topic to which the Kafka message belongs. | Source table |
partition | INT NOT NULL METADATA VIRTUAL | The ID of the partition to which the Kafka message belongs. | Source table |
headers | MAP<STRING, BYTES> NOT NULL METADATA VIRTUAL | Headers of the Kafka message. | Source table and sink table |
leader-epoch | INT NOT NULL METADATA VIRTUAL | The leader epoch of the Kafka message. | Source table |
offset | BIGINT NOT NULL METADATA VIRTUAL | The offset of the Kafka message. | Source table |
timestamp | TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL METADATA VIRTUAL | The timestamp of the Kafka message. | Source table and sink table |
timestamp-type | STRING NOT NULL METADATA VIRTUAL | The timestamp type of the Kafka message:
| Source table |
With parameters
-
General
Parameter
Description
Data type
Required
Default value
Remarks
connector
The type of the table.
String
Yes
None
Set the value to kafka.
properties.bootstrap.servers
The addresses of Kafka brokers.
String
Yes
None
Format: host:port,host:port,host:port. Separate multiple host:port pairs with commas (,).
properties.*
The parameters that are configured for the Kafka client.
String
No
None
The suffix must be defined in the Kafka official documentation for producer and consumer configurations.
Flink removes the properties. prefix and passes the remaining configuration to the Kafka client. For example, you can disable automatic topic creation by using
'properties.allow.auto.create.topics'='false'
.You cannot modify the configurations of the following parameters by adding the properties. prefix because the values of the parameters are overwritten after you use the Kafka connector:
key.deserializer
value.deserializer
format
The format that is used to read or write the value fields of Kafka messages.
String
No
None
Supported formats
csv
json
avro
debezium-json
canal-json
maxwell-json
avro-confluent
raw
NoteFor more information about the format parameter, see Format parameter.
key.format
The format that is used to read or write the key fields of Kafka messages.
String
No
None
Supported formats
csv
json
avro
debezium-json
canal-json
maxwell-json
avro-confluent
raw
NoteThe key.options parameter is required when you configure this parameter.
key.fields
The key fields in the source table or sink table that correspond to the key fields of Kafka messages.
String
No
None
Separate multiple field names with semicolons (;). For example,
field1;field2
key.fields-prefix
The custom prefix for all key fields in Kafka messages. You can configure this parameter to prevent name conflicts with the value fields.
String
No
None
This parameter is used only to distinguish the column names of source tables and sink tables. The prefix is removed from the column names when the key fields of Kafka messages are parsed or generated.
NoteThe value.fields-include parameter must be set to EXCEPT_KEY when you configure this parameter.
value.format
The format that is used to read or write the value fields of Kafka messages.
String
No
None
This parameter is equivalent to the format parameter. Therefore, only one of the format and value.format parameters can be configured. If both parameters are configured, a conflict occurs.
value.fields-include
Specifies whether to include corresponding message keys when parsing or generating Kafka message values.
String
No
ALL
Valid values:
ALL: All fields are processed as value fields of Kafka messages. This is the default value.
EXCEPT_KEY: All fields except for the fields specified by the key.fields parameter are processed as the Kafka message value.
-
Source table
Parameter
Description
Data type
Required
Default value
Remarks
topic
The name of the topic from which you want to read data.
String
No
None
Separate multiple topic names with semicolons (;), such as topic-1;topic-2
NoteThe topic parameter cannot be used together with the topic-pattern parameter.
topic-pattern
The regular expression that is used to match topics. When a job is running, all topics whose names match the specified regular expression are subscribed by the consumer.
String
No
None
NoteOnly Realtime Compute for Apache Flink VVR 3.0.0 or later supports this parameter.
The topic parameter cannot be used together with the topic-pattern parameter.
properties.group.id
The ID of the consumer group.
String
No
KafkaSource-{Name of the source table}
If the specified group ID is used for the first time, you must set the properties.auto.offset.reset parameter to earliest or latest to specify the start offset for the first consumption.
scan.startup.mode
The start offset from which data is read from Kafka.
String
No
group-offsets
Valid values:
earliest-offset: reads data from the earliest partition of Kafka.
latest-offset: reads data from the latest partition of Kafka.
group-offsets (default value): Start reading from the offset submitted by the specified properties.group.id.
Timestamp: Start reading from the timestamp specified by scan.startup.timestamp-millis.
specific-offsets: reads data from the offset that is specified by the scan.startup.specific-offsets parameter.
NoteThis parameter takes effect when the deployment is started without states. When the deployment is restarted from a checkpoint or resumed from the specified state, the deployment preferentially starts to read data from the progress that is saved in the state data.
scan.startup.specific-offsets
The start offset of each partition when the scan.startup.mode parameter is set to specific-offsets.
String
No
None
For example,
partition:0,offset:42;partition:1,offset:300
scan.startup.timestamp-millis
The timestamp of the start offset when the scan.startup.mode parameter is set to timestamp.
Long
No
None
Unit: milliseconds
scan.topic-partition-discovery.interval
The time interval at which the Kafka source checks for new partitions.
Duration
No
5 minutes
The default partition discovery interval is 5 minutes. To disable the dynamic partition discovery feature, you must explicitly set this parameter to a non-positive value. After the dynamic partition discovery feature is enabled, the Kafka source can automatically discover new partitions and read data from the partitions. In topic-pattern mode, the Kafka source reads data from new partitions of existing topics and data from all partitions of new topics that match the regular expression.
NoteFor Realtime Compute for Apache Flink that uses VVR 6.0.X, the dynamic partition discovery feature is disabled by default. For Realtime Compute for Apache Flink that uses VVR 8.0 or later, this feature is enabled by default. The default partition discovery interval is 5 minutes.
scan.header-filter
Kafka data filtering based on whether the data contains a specific message header.
String
No
None
Separate a header key and the value with a colon (:). Separate multiple headers with logical operators such as AND (&) or OR (|). The logical operator NOT (!) is supported. For example,
depart:toy|depart:book&!env:test
indicates that Kafka data that contains depart=toy or depart=book and does not contain env=test is retained.NoteOnly Realtime Compute for Apache Flink that uses VVR 8.0.6 or later supports this parameter.
Parenthesis operations are not supported.
Logical operations are performed from left to right in sequence.
The header value in the UTF-8 format is converted into a string and compared with the header value specified by the scan.header-filter parameter.
scan.check.duplicated.group.id
Specifies whether to check duplicate consumer groups that are specified by the
properties.group.id
parameter.Boolean
No
false
Valid values:
true: Checks duplicate consumer groups before a deployment starts. If duplicate consumer groups exist, an error is reported to prevent conflicts with existing consumer groups.
false: Does not check duplicate consumer groups before a deployment starts.
NoteOnly Realtime Compute for Apache Flink that uses VVR 6.0.4 or later supports this parameter.
-
Sink table
Parameter
Description
Data type
Required
Default value
Remarks
topic
The name of the topic into which data is written.
String
Yes
None
None
sink.partitioner
The mapping pattern between Flink partitions and Kafka partitions.
String
No
default
Valid values:
default: The default Kafka partitioner is used to partition data. This is the default value.
fixed: Each Flink partition corresponds to a fixed Kafka partition.
round-robin: Data in a Flink partition is distributed to the Kafka partitions in round-robin sequence.
Custom partition mapping pattern: You can create a subclass of FlinkKafkaPartitioner to configure a custom partition mapping pattern if the fixed and round-robin patterns do not meet your business requirements. For example, org.mycompany.MyPartitioner
sink.delivery-guarantee
The delivery semantics for the Kafka sink table.
String
No
at-least-once
Valid values:
none: The delivery semantics is not ensured. Data may be lost or duplicated.
at-least-once: The at-least-once semantics is provided to ensure that no data is lost. However, duplicate data may exist. This is the default value.
exactly-once: Kafka transactions are used to ensure the exactly-once semantics. This ensures that data is not lost or duplicated.
NoteThe sink.transactional-id-prefix parameter is required when you use the exactly-once semantics.
sink.transactional-id-prefix
The prefix of the Kafka transaction ID that is used in the exactly-once semantics.
String
No
None
This parameter takes effect only when the sink.delivery-guarantee parameter is set to exactly-once.
sink.parallelism
The parallelism of operators in the Kafka sink table.
Integer
No
None
The parallelism of upstream operators, which is determined by the framework.
-
Parameters for data synchronization using the CREATE TABLE AS statement
Parameter
Description
Data type
Required
Default value
Remarks
json.infer-schema.flatten-nested-columns.enable
Specifies whether to recursively expand nested columns in a JSON text.
Boolean
No
false
Valid values:
true: Nested columns are recursively expanded. Realtime Compute for Apache Flink uses the path that indexes the value of the column that is expanded as the name of the column. For example, the name of the column col in
JSON {"nested": {"col": true}}
is nested.col after the column is expanded.false: Nested types are parsed as the STRING type. This is the default value.
json.infer-schema.primitive-as-string
Specifies whether to infer all basic types as the STRING type.
Boolean
No
false
Valid values:
true: All basic types are inferred as the STRING type.
false: Data types are inferred based on the data type mappings.
You can add the
properties.
prefix to all configuration items supported by Kafka consumers and producers and use the configuration items in the WITH clause. For example, to set the request.timeout.ms parameter of a Kafka consumer or producer to 60000 milliseconds, configure'properties.request.timeout.ms'='60000'
in the WITH clause. For more information about the configuration items of Kafka consumers and producers, see the Apache Kafka official documentation.
Security and authentication
If your Kafka cluster requires secure connections or authentication, add the related security and authentication configuration items with the properties.
prefix and configure the configuration items in the WITH clause. The following sample code provides an example of how to configure a Kafka table to use PLAIN as the SASL mechanism and provide JAAS configurations.
CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
...
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.sasl.mechanism' = 'PLAIN',
'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="username" password="password";'
)
The following sample code provides an example of how to configure a Kafka table to use SASL_SSL as the security protocol and SCRAM-SHA-256 as the SASL mechanism.
CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
...
'properties.security.protocol' = 'SASL_SSL',
/* Configure Secure Sockets Layer (SSL). */
/* Specify the path of the CA certificate truststore provided by the server. */
'properties.ssl.truststore.location' = '/flink/usrlib/kafka.client.truststore.jks',
'properties.ssl.truststore.password' = 'test1234',
/* Specify the path of the private key file keystore if client authentication is required. */
'properties.ssl.keystore.location' = '/flink/usrlib/kafka.client.keystore.jks',
'properties.ssl.keystore.password' = 'test1234',
/* The algorithm used by the client to verify the server address. A null value indicates that server address verification is disabled. */
'properties.ssl.endpoint.identification.algorithm' = '',
/* Configure SASL. */
/* Configure SCRAM-SHA-256 as the SASL mechanism. */
'properties.sasl.mechanism' = 'SCRAM-SHA-256',
/* Configure JAAS. */
'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="username" password="password";'
)
The CA certificate and private key mentioned in the sample code can be uploaded to the platform using the File Management feature in the Realtime Compute console. For example, if the files are uploaded to the /flink/usrlib
directory and the CA certificate file is named my-truststore.jks, you can specify 'properties.ssl.truststore.location' = '/flink/usrlib/my-truststore.jks'
in the WITH clause to use the certificate.
-
The preceding code snippets provide examples of the configurations used in most cases. Before configuring the Kafka connector, contact Kafka server O&M personnel to obtain the correct security and authentication configuration information.
-
Unlike open source Flink, the SQL editor of fully managed Flink automatically escapes double quotation marks (") by default. Therefore, you do not need to add additional escape characters (\) to the double quotation marks (") when configuring the
properties.sasl.jaas.config
parameter.
Start offsets for source tables
Startup modes
You can configure the scan.startup.mode parameter to specify the start offset for a Kafka source table:
-
earliest-offset: reads data from the earliest offset of the current partition.
-
latest-offset: reads data from the latest offset of the current partition.
-
group-offsets: reads data from the offset committed by the consumer group with the specified ID. The ID is specified by the properties.group.id parameter.
-
timestamp: reads data from the first record of each partition with a timestamp greater than or equal to the specified timestamp. The timestamp is specified by the scan.startup.timestamp-millis parameter.
-
specific-offsets: reads data from the offset specified by the scan.startup.specific-offsets parameter.
-
If you do not specify the start offset, the Kafka source table begins reading data from the offset committed by the consumer group with the specified ID.
-
The scan.startup.mode parameter is effective only when the deployment starts without states. When the deployment starts with states, it begins reading data from the offset stored in the state data.
The following sample code provides an example:
CREATE TEMPORARY TABLE kafka_source (
...
) WITH (
'connector' = 'kafka',
...
-- Consume data from the earliest offset.
'scan.startup.mode' = 'earliest-offset',
-- Consume data from the latest offset.
'scan.startup.mode' = 'latest-offset',
-- Consume data from the offset that is committed by the consumer group my-group.
'properties.group.id' = 'my-group',
'scan.startup.mode' = 'group-offsets',
'properties.auto.offset.reset' = 'earliest', -- If my-group is used for the first time, the consumption starts from the earliest offset.
'properties.auto.offset.reset' = 'latest', -- If my-group is used for the first time, the consumption starts from the latest offset.
-- Consume data from the timestamp 1655395200000, in milliseconds.
'scan.startup.mode' = 'timestamp',
'scan.startup.timestamp-millis' = '1655395200000',
-- Consume data from the specified offset.
'scan.startup.mode' = 'specific-offsets',
'scan.startup.specific-offsets' = 'partition:0,offset:42;partition:1,offset:300'
);
Priorities of start offsets
The source table consumes data based on the following priorities of start offsets, in descending order:
-
The offset stored in checkpoints or savepoints.
-
The start time specified in the Realtime Compute for Apache Flink console.
-
The start offset specified using the scan.startup.mode parameter in the WITH clause.
-
Use group-offsets when scan.startup.mode is not specified.
If the offset is invalid due to expiration or issues in the Kafka cluster at any of the preceding steps, the offset is reset based on the policy specified by the properties.auto.offset.reset parameter. If this parameter is not configured, an exception is returned and manual intervention is required.
Typically, the Kafka source table starts reading data from the offset committed by a consumer group with a new group ID. The source table queries the Kafka cluster for the committed offset of the group. Since the group ID is new, no valid offset is returned, and the offset is reset based on the policy specified by the properties.auto.offset.reset parameter. Therefore, you must configure the properties.auto.offset.reset parameter to specify the offset reset policy when consuming data with a new group ID.
Offset commit for source tables
The Kafka source table commits a consumer offset to the Kafka cluster only after a successful checkpoint. If the checkpoint interval is excessively large, the consumer offset is delayed in being committed to the Kafka cluster. When a checkpoint is created, the Kafka source table stores the current data reading progress in the state backend. The offset committed to the Kafka cluster is not used for fault recovery but only to monitor the data reading progress in Kafka. If the offset fails to be committed, data accuracy is not affected.
Custom partitioner for sink tables
If the built-in Kafka producer does not meet your business requirements, you can use a custom Kafka partitioner to write data to the relevant partitions. A custom partitioner must inherit from FlinkKafkaPartitioner. After development, compile the JAR package and upload it to the Realtime Compute console using the File Management feature. Once uploaded and referenced, set the sink.partitioner parameter in the WITH clause to the full class path of the partitioner, such as org.mycompany.MyPartitioner
.
Selection of Kafka, Upsert Kafka, or Kafka JSON catalog
Kafka is a message queue system designed exclusively for data insertion, lacking the capability to update or delete data. Consequently, Kafka is unable to handle Change Data Capture (CDC) from upstream systems or the retraction mechanisms of operators like aggregate and join in streaming SQL computations. To write data involving changes or retractions to Kafka, utilize an Upsert Kafka sink table, which manages change data in a specialized way.
If you want to synchronize change data from one or more upstream database tables to Kafka in batches, you can use a Kafka JSON catalog. If data in Kafka is in JSON format, a Kafka JSON catalog allows you to avoid configuring the schema and parameters in the WITH clause. For more information, see Manage Kafka JSON Catalog.
As a CTAS data source
The CREATE TABLE AS statement can perform data synchronization from a Kafka source table in JSON format. During data synchronization, if specific fields are not present in a predefined table schema, Flink attempts to automatically infer the data types of the columns. If the inferred data types do not meet your business requirements, you can perform auxiliary type inference to declare the data types of the columns.
For more information about the JSON format, see JSON Format.
-
Type inference
By default, Flink displays only the first layer of data in the JSON text during type inference. Flink infers the SQL data types based on the JSON data types and values and the data type mappings. The following table lists the mappings between JSON and Flink SQL data types.
JSON type
Flink SQL type
BOOLEAN
BOOLEAN
STRING
DATE, TIMESTAMP, TIMESTAMP_LTZ, TIME, or STRING
INT or LONG
BIGINT
BIGINT
DECIMAL or STRING
NoteThe precision of the DECIMAL type in Flink is limited. If the value of an integer exceeds the maximum precision of the DECIMAL type, Flink determines the data type of the value to be STRING to prevent loss of precision.
FLOAT, DOUBLE, or BIG DECIMAL
DOUBLE
ARRAY
STRING
OBJECT
STRING
Example
-
JSON text
{ "id": 101, "name": "VVP", "properties": { "owner": "Alibaba Cloud", "engine": "Flink" } "type": ["Big data"] }
-
The following table is written into the downstream system:
id
name
properties
type
101
VVP
{ "owner": "Alibaba Cloud", "engine": "Flink" }
["Big data"]
-
-
Auxiliary type inference
If type inference based on the above rules does not meet your needs, you can explicitly declare the data type of a specific column in the source table's DDL statement. If you use this method, Flink preferentially parses the field you want to use as the data type of the column declared in the DDL statement. In the following example, Flink parses the price field as the DECIMAL type instead of converting the field to the DOUBLE type based on the data type mappings.
CREATE TABLE evolvingKafkaSource ( price DECIMAL(18, 2) ) WITH ( 'connector' = 'kafka', 'properties.bootstrap.servers' = 'localhost:9092', 'topic' = 'evolving_kafka_demo', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json' );
However, if the data type you specify in the DDL statement differs from the actual data type, you can use one of the following methods to handle the issue:
-
If a column's declared data type has a wider range than the actual data type, the field is parsed according to the declared data type. For example, if a column declared as DOUBLE is used to hold BIGINT data, the column's data type is parsed as DOUBLE.
-
If the actual data type has a wider range than or is incompatible with the declared data type, an error is returned because CREATE TABLE AS does not support type changes. To resolve this, you must restart your deployment and declare a correct data type.
The following figure shows the tree structure of data type ranges and compatibility.
Note-
In the above figure, data types closer to the root node have wider ranges. Data types on different branches are incompatible.
-
Auxiliary type inference is not supported for complex data types such as ROW, ARRAY, MAP, and MULTISET.
-
By default, Flink parses complex data types as STRING.
-
-
In most cases, the JSON text in Kafka topics has a nested structure. If you want to extract nested columns from JSON texts, you can use one of the following methods:
-
Declare
'json.infer-schema.flatten-nested-columns.enable'='true'
in the DDL statement of the source table to expand all elements in nested columns to the top level. This way, all nested columns are expanded in sequence. To prevent column name conflicts, Flink uses the path that indexes the column as the name of the column after the column is expanded.ImportantColumn name conflicts cannot be directly resolved. However, you can work around this by setting 'json.ignore-parse-errors' to 'true' in the source table's DDL statement to ignore conflicts.
-
Add a computed column
`rowkey` AS JSON_VALUE(`properties`, `$.rowkey`)
in the CTAS syntax in the DDL statement to specify the column you want to expand. For more information, see Example 4: Synchronize table schema and data and perform calculations.
Examples
Example 1: read data from Kafka and write data to Kafka
Flink reads data from a Kafka topic named source and then writes the data to a topic named sink. The data is in CSV format.
CREATE TEMPORARY TABLE kafka_source (
id INT,
name STRING,
age INT
) WITH (
'connector' = 'kafka',
'topic' = 'source',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'properties.group.id' = '<yourKafkaConsumerGroupId>',
'format' = 'csv'
);
CREATE TEMPORARY TABLE kafka_sink (
id INT,
name STRING,
age INT
) WITH (
'connector' = 'kafka',
'topic' = 'sink',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'properties.group.id' = '<yourKafkaConsumerGroupId>',
'format' = 'csv'
);
INSERT INTO kafka_sink SELECT id, name, age FROM kafka_source;
Example 2: synchronize table schema and data
Use the Kafka connector to synchronize messages from a Kafka topic to Hologres in real time. In this case, you can use the offset and partition IDs of Kafka messages as primary keys. This ensures no duplicate messages exist in Hologres if a failover occurs.
CREATE TEMPORARY TABLE kafkaTable (
`offset` INT NOT NULL METADATA,
`part` BIGINT NOT NULL METADATA FROM 'partition',
PRIMARY KEY (`part`, `offset`) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'topic' = 'kafka_evolution_demo',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.infer-schema.flatten-nested-columns.enable' = 'true'
-- Optional. Expand all nested columns.
);
CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka`
WITH (
'connector' = 'hologres'
) AS TABLE vvp.`default`.kafkaTable;
Example 3: synchronize table schema and key and value data of Kafka messages
The key fields of Kafka messages store relevant information. You can synchronize data in the key and value columns of Kafka messages simultaneously.
CREATE TEMPORARY TABLE kafkaTable (
`key_id` INT NOT NULL,
`val_name` VARCHAR(200)
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'topic' = 'kafka_evolution_demo',
'scan.startup.mode' = 'earliest-offset',
'key.format' = 'json',
'value.format' = 'json',
'key.fields' = 'key_id',
'key.fields-prefix' = 'key_',
'value.fields-prefix' = 'val_',
'value.fields-include' = 'EXCEPT_KEY'
);
CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka`(
WITH (
'connector' = 'hologres'
) AS TABLE vvp.`default`.kafkaTable;
The key columns in a Kafka message do not support table schema changes and type inference. Manual declaration is required.
Example 4: synchronize table schema and data and perform calculations
When synchronizing data from Kafka to Hologres, lightweight calculation is required.
CREATE TEMPORARY TABLE kafkaTable (
`distinct_id` INT NOT NULL,
`properties` STRING,
`timestamp` TIMESTAMP_LTZ METADATA,
`date` AS CAST(`timestamp` AS DATE)
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'topic' = 'kafka_evolution_demo',
'scan.startup.mode' = 'earliest-offset',
'key.format' = 'json',
'value.format' = 'json',
'key.fields' = 'key_id',
'key.fields-prefix' = 'key_'
);
CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka` WITH (
'connector' = 'hologres'
) AS TABLE vvp.`default`.kafkaTable
ADD COLUMN
`order_id` AS COALESCE(JSON_VALUE(`properties`, '$.order_id'), 'default');
-- Use COALESCE to handle null values.
DataStream API
When reading and writing data using DataStream, you must use the corresponding DataStream connector to connect to fully managed Flink. For more information on how to configure a DataStream connector, see DataStream connector configuration.
-
Create a Kafka source
The Kafka source provides a builder class for constructing the instance of KafkaSource. The following sample code shows how to create a Kafka source to consume data from the earliest offset of the input-topic. The name of the consumer group is my-group. The Kafka message body is deserialized into a string.
Java
KafkaSource<String> source = KafkaSource.<String>builder() .setBootstrapServers(brokers) .setTopics("input-topic") .setGroupId("my-group") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
XML
The Kafka DataStream connector is available in the Maven central repository.
<dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-kafka</artifactId> <version>${vvr-version}</version> </dependency>
You must configure the following parameters when creating a Kafka source.
Parameter
Description
BootstrapServers
The addresses of Kafka brokers. You can call the setBootstrapServers(String) operation to configure the addresses.
GroupId
The ID of the consumer group. You can call the setGroupId(String) method to configure the ID.
Topics or Partition
The topics or names of the partitions to which you subscribe. You can configure a Kafka source to subscribe to topics or partitions by using one of the following subscription patterns:
Topic list. After you configure a topic list, the Kafka source subscribes to all partitions of the specified topics.
KafkaSource.builder().setTopics("topic-a","topic-b")
Topic pattern. After you specify a regular expression, the Kafka source subscribes to all partitions of the topics that match the specified regular expression.
KafkaSource.builder().setTopicPattern("topic.*")
Partition list. After you configure a partition list, the Kafka source subscribes to the specified partitions.
final HashSet<TopicPartition> partitionSet = new HashSet<>(Arrays.asList( new TopicPartition("topic-a", 0), // Partition 0 of topic "topic-a" new TopicPartition("topic-b", 5))); // Partition 5 of topic "topic-b" KafkaSource.builder().setPartitions(partitionSet)
Deserializer
A deserializer that deserializes Kafka messages.
You can call the setDeserializer(KafkaRecordDeserializationSchema) method to specify a deserializer. The KafkaRecordDeserializationSchema interface defines how a ConsumerRecord object is deserialized. If you want to deserialize only the value fields in the Kafka messages of the ConsumerRecord object, you can use one of the following methods:
A Kafka source provides the setValueOnlyDeserializer(DeserializationSchema) method. The DeserializationSchema class defines how a Kafka message that is stored as a binary value is deserialized.
Use a deserializer provided by Kafka. Multiple implementation classes are available. For example, you can use the StringDeserializer class to deserialize a message into a string.
import org.apache.kafka.common.serialization.StringDeserializer; KafkaSource.<String>builder() .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class));
NoteIf you want to deserialize a ConsumerRecord object, you must create a class that implements the KafkaRecordDeserializationSchema interface.
When using a Kafka DataStream connector, you must configure the following Kafka properties:
-
Start offset
You can use an offset initializer to specify an offset for a Kafka source when it starts reading data. An offset initializer is an object based on the OffsetsInitializer interface. The KafkaSource class provides the following built-in offset initializers.
Offset initializer
Code configuration
Specifies that the Kafka source starts to consume messages from the earliest record of each partition.
KafkaSource.builder().setStartingOffsets(OffsetsInitializer.earliest())
Specifies that the Kafka source starts to consume messages from the latest record of each partition.
KafkaSource.builder().setStartingOffsets(OffsetsInitializer.latest())
Specifies that the Kafka source starts to consume messages from the first record of each partition. The first record has a timestamp that is greater than or equal to the specified timestamp. Unit: milliseconds.
KafkaSource.builder().setStartingOffsets(OffsetsInitializer.timestamp(1592323200000L))
Specifies that the Kafka source starts to consume messages from the committed offset of each partition and a reset strategy is specified. If a partition does not have a committed offset, the reset strategy resets the offset and the Kafka source starts from the earliest record of the partition.
KafkaSource.builder().setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
Specifies that the Kafka source starts to consume messages from the committed offset of each partition and no reset strategy is specified.
KafkaSource.builder().setStartingOffsets(OffsetsInitializer.committedOffsets())
Note-
If the built-in offset initializers do not meet your business requirements, you can create custom offset initializers.
-
If you do not specify an offset initializer, the OffsetsInitializer.earliest() offset initializer is used by default.
-
-
Streaming mode and batch mode
A Kafka source can operate in streaming execution mode or batch execution mode. By default, a Kafka source operates in streaming execution mode, where the job continues to run until it fails or is canceled. If you want a Kafka source to operate in batch execution mode, call the setBounded(OffsetsInitializer) method to specify a stop offset. When all partitions reach their stop offsets, the Kafka source exits.
NoteTypically, a Kafka source operating in streaming execution mode does not have a stop offset. If you want to debug a Kafka source in streaming execution mode, call the setUnbounded(OffsetsInitializer) method to specify a stop offset. The methods to specify a stop offset vary based on whether you use streaming execution mode or batch execution mode.
-
Dynamic partition discovery
If you want a running deployment to process data from new topics and from new partitions that match your subscription pattern without needing to restart the deployment, enable the dynamic partition discovery feature on the Kafka source.
NoteBy default, the dynamic partition discovery feature is enabled, with an interval of 5 minutes. To disable this feature, explicitly set the partition.discovery.interval.ms parameter to a non-positive value. The following sample code provides an example of how to configure the partition.discovery.interval.ms parameter.
KafkaSource.builder() .setProperty("partition.discovery.interval.ms", "10000") // Check for new partitions every 10 seconds.
-
Event time and watermarks
By default, a Kafka source uses the timestamp attached to a record as the event time for the record. You can define a watermark strategy based on the event time of each record and send the watermarks to downstream services.
env.fromSource(kafkaSource, new CustomWatermarkStrategy(), "Kafka Source With Custom Watermark Strategy")
For more information about custom watermark strategies, see Generating Watermarks.
-
Offset commit
When a checkpoint is generated, a Kafka source commits the Kafka consumer offset of each partition to Kafka brokers. This ensures that the Kafka consumer offsets recorded on Kafka brokers are consistent with the state of the checkpoint. If checkpointing is disabled, a Kafka source relies on the Kafka consumer to commit the offsets to Kafka brokers. The automatic offset commit feature is configured using the enable.auto.commit and auto.commit.interval.ms parameters.
NoteKafka sources do not use the committed offsets recorded on Kafka brokers for fault tolerance. When you commit offsets, Kafka brokers can monitor the progress of record consumption on each partition.
-
Additional properties
In addition to the properties mentioned above, you can call the setProperties(Properties) and setProperty(String, String) methods to configure additional properties for the Kafka source and Kafka consumer. The following table describes the properties of a Kafka source.
Property
Description
client.id.prefix
Specifies the prefix for the client ID of the Kafka consumer.
partition.discovery.interval.ms
Specifies the time interval at which the Kafka source checks for new partitions.
NoteIf the Kafka source operates in batch execution mode, the property is automatically set to -1.
register.consumer.metrics
Specifies whether to register metrics for the Kafka consumer in Realtime Compute for Apache Flink.
Additional properties for the Kafka consumer
For more information about the configuration items of Kafka consumers, see Apache Kafka.
ImportantThe Kafka DataStream connector overwrites the values of the following properties:
key.deserializer: The value of this property is set to ByteArrayDeserializer.
value.deserializer: The value of this property is set to ByteArrayDeserializer.
auto.offset.reset.strategy: The value of this property is set to OffsetsInitializer#getAutoOffsetResetStrategy().
The following sample code shows how the Kafka consumer connects to the Kafka cluster using a JAAS configuration and the SASL/PLAIN authentication mechanism.
KafkaSource.builder() .setProperty("sasl.mechanism", "PLAIN") .setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";")
-
Monitoring
Kafka sources register metrics in Realtime Compute for Apache Flink for monitoring and diagnosis.
-
Metric scope
All metrics of a Kafka source are registered under the KafkaSourceReader metric group, which is a subgroup of the operator metric group. Metrics for a specific partition are registered in the KafkaSourceReader.topic.<topic_name>.partition.<partition_id> metric group.
For example, for a topic named my-topic and a partition named 1, the consumer offset of the partition is reported by the <some_parent_groups>.operator.KafkaSourceReader.topic.my-topic.partition.1.currentOffset metric. The number of successful commits of consumer offsets is measured by the <some_parent_groups>.operator.KafkaSourceReader.commitsSucceeded metric.
-
Metrics
Metric name
Description
Scope
currentOffset
Reports the Kafka consumer offset of a partition.
TopicPartition
committedOffset
Reports the committed consumer offset of a partition.
TopicPartition
commitsSucceeded
Reports the number of successful commits of consumer offsets.
KafkaSourceReader
commitsFailed
Reports the number of failed commits of consumer offsets.
KafkaSourceReader
-
Metrics for a Kafka consumer
The metrics for a Kafka consumer are registered in the KafkaSourceReader.KafkaConsumer metric group. For example, the records-consumed-total metric is registered at <some_parent_groups>.operator.KafkaSourceReader.KafkaConsumer.records-consumed-total.
You can configure the register.consumer.metrics parameter to specify whether to register metrics for the Kafka consumer. By default, this parameter is set to true. For more information about the metrics for a Kafka consumer, see Apache Kafka.
-
-
Create a Kafka sink
A Kafka sink can write data from multiple streams to one or more Kafka topics.
DataStream<String> stream = ... Properties properties = new Properties(); properties.setProperty("bootstrap.servers", ); KafkaSink<String> kafkaSink = KafkaSink.<String>builder() .setKafkaProducerConfig(kafkaProperties) // producer config .setRecordSerializer( KafkaRecordSerializationSchema.builder() .setTopic("my-topic") // target topic .setKafkaValueSerializer(StringSerializer.class) // serialization schema .build()) .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE) // fault-tolerance .build(); stream.sinkTo(kafkaSink);
You must configure the following parameters when creating a Kafka sink.
Parameter
Description
Topic
The name of the topic to which data is written.
Serialization schema
You must provide a
KafkaRecordSerializationSchema
to convert input data into aProducerRecord
object of Kafka when you create a Kafka sink. Realtime Compute for Apache Flink provides a schema builder that provides some common components, such as message key or message value serialization, topic selection, and message partitioning. You can also implement the corresponding interfaces for more advanced control. A Kafka sink calls the ProducerRecord<byte[], byte[]> serialize(T element, KafkaSinkContext context, Long timestamp) method for each incoming record to generate a ProducerRecord object that represents the serialized record. Then, the Kafka sink writes the ProducerRecord object to the required topic.The ProducerRecord class provides properties that you can configure to manage behavior when the Kafka producer writes a record to the required Kafka topic. You can configure the following properties of the ProducerRecord class:
Set the name of the topic to which the record is written.
Define the key for the record.
Specify the name of the partition to which the record is written.
Properties of the Kafka client
The bootstrap.servers property is required. This property specifies the addresses of the Kafka brokers. Separate multiple addresses with a comma (,).
Fault tolerance semantics
After you enable the checkpointing feature, a Kafka sink can ensure exactly-once delivery. In addition to enabling the checkpointing feature, you can configure the DeliveryGuarantee parameter of a Kafka sink to specify one of the following fault tolerance semantics:
DeliveryGuarantee.NONE: By default, the Kafka sink does not ensure data delivery. Data may be lost or duplicated.
DeliveryGuarantee.AT_LEAST_ONCE: The Kafka sink ensures that data is not lost. However, data may be duplicated.
DeliveryGuarantee.EXACTLY_ONCE: The Kafka sink ensures that data is not lost or duplicated. The Kafka transaction mechanism is used to ensure exactly-once delivery.
NoteFor more information about the considerations for using the exactly-once semantics, see Usage notes of Semantic.EXACTLY_ONCE.
Data ingestion
The Kafka connector can be used as a source or a sink to develop YAML drafts for data ingestion.
Syntax structure
source:
type: kafka
name: Kafka source
properties.bootstrap.servers: localhost:9092
topic: ${kafka.topic}
sink:
type: kafka
name: Kafka Sink
properties.bootstrap.servers: localhost:9092
Configuration items
-
General
Parameter
Description
Required
Data type
Default value
Remarks
type
The type of the source or sink.
Yes
String
None
Set the value to kafka.
name
The name of the source or sink.
No
String
None
None
properties.bootstrap.servers
The addresses of Kafka brokers.
Yes
String
None
Format:
host:port,host:port,host:port
. Separate multiple host:port pairs with commas (,).properties.*
The parameters that are configured for the Kafka client.
No
String
None
The suffix must be defined in the Kafka official documentation for producer and consumer configurations.
Flink removes the properties. prefix and passes the remaining configuration to the Kafka client. For example, you can disable automatic topic creation by using
'properties.allow.auto.create.topics' = 'false'
.value.format
The format that is used to read or write the value fields of Kafka messages.
No
String
debezium-json
Valid values:
debezium-json
canal-json
-
Source table
Parameter
Description
Required
Data type
Default value
Remarks
topic
The name of the topic from which you want to read data.
No
String
None
Separate multiple topic names with semicolons (;), such as topic-1;topic-2
NoteThe topic parameter cannot be used together with the topic-pattern parameter.
topic-pattern
The regular expression that is used to match topics. When a job is running, all topics whose names match the specified regular expression are subscribed by the consumer.
No
String
None
NoteThe topic parameter cannot be used together with the topic-pattern parameter.
properties.group.id
The ID of the consumer group.
No
String
None
If the specified group ID is used for the first time, you must set the properties.auto.offset.reset parameter to earliest or latest to specify the start offset for the first consumption.
scan.startup.mode
The start offset from which data is read from Kafka.
No
String
group-offsets
Valid values:
earliest-offset: reads data from the earliest partition of Kafka.
latest-offset: reads data from the latest partition of Kafka.
group-offsets (default value): Start reading from the offset submitted by the specified properties.group.id.
timestamp: Start reading from the timestamp specified by scan.startup.timestamp-millis.
specific-offsets: reads data from the offset that is specified by the scan.startup.specific-offsets parameter.
NoteThis parameter takes effect when the deployment is started without states. When the deployment is restarted from a checkpoint or resumed from the specified state, the deployment preferentially starts to read data from the progress that is saved in the state data.
scan.startup.specific-offsets
The start offset of each partition when the scan.startup.mode parameter is set to specific-offsets.
No
String
None
For example,
partition:0,offset:42;partition:1,offset:300
scan.startup.timestamp-millis
The timestamp of the start offset when the scan.startup.mode parameter is set to timestamp.
No
Long
None
Unit: milliseconds
scan.topic-partition-discovery.interval
The time interval at which the Kafka source checks for new partitions.
No
Duration
5 minutes
The default partition discovery interval is 5 minutes. To disable the dynamic partition discovery feature, you must explicitly set this parameter to a non-positive value. After the dynamic partition discovery feature is enabled, the Kafka source can automatically discover new partitions and read data from the partitions. In topic-pattern mode, the Kafka source reads data from new partitions of existing topics and data from all partitions of new topics that match the regular expression.
scan.check.duplicated.group.id
Specifies whether to check duplicate consumer groups that are specified by the
properties.group.id
parameter.No
Boolean
false
Valid values:
true: Checks duplicate consumer groups before a deployment starts. If duplicate consumer groups exist, an error is reported to prevent conflicts with existing consumer groups.
false: Does not check duplicate consumer groups before a deployment starts.
schema.inference.strategy
The strategy for schema inference.
No
String
continuous
Valid values:
continuous: The schema of each data record is inferred. If the schemas are incompatible, a wider schema is inferred and schema change events are generated.
static: Schema inference is performed only once when the deployment is started. Data is parsed based on the initial schema. Schema change events are not generated.
NoteFor more information about schema inference, see Schema inference and change synchronization policies.
Only Realtime Compute for Apache Flink that uses VVR 8.0.11 or later supports this parameter.
scan.max.pre.fetch.records
The maximum number of messages that the system attempts to consume and parse in a partition during initial schema inference.
No
Int
50
Before a deployment reads and processes data, the system attempts to consume a specific number of the latest messages in advance in a partition to initialize the schema information.
-
Debezium JSON-formatted parameters for source tables
Parameter
Required
Data type
Default value
Description
debezium-json.distributed-tables
No
Boolean
false
If the data of a single table in Debezium JSON is stored in multiple partitions, you must configure this parameter.
NoteOnly Realtime Compute for Apache Flink that uses VVR 8.0.11 or later supports this parameter.
ImportantAfter you configure this parameter, you must start the deployment without states.
debezium-json.schema-include
No
Boolean
false
When you configure Debezium Kafka Connect, you can set the value.converter.schemas.enable option of Kafka configurations to true based on your business requirements. This way, the schema information is included in the message body. This option specifies whether the schema is included in the Debezium JSON message.
Valid values:
true: The schema is included in the Debezium JSON message.
false: The schema is not included in the Debezium JSON message.
debezium-json.ignore-parse-errors
No
Boolean
false
Valid values:
true: The current row is skipped if a parsing exception occurs.
false: An error is returned and the deployment fails to start. This is the default value.
debezium-json.infer-schema.primitive-as-string
No
Boolean
false
Specifies whether to infer all data types as the STRING type when the table schema is parsed.
Valid values:
true: All basic types are inferred as the STRING type.
false: Data types are inferred based on the data type mappings. This is the default value.
-
Canal JSON-formatted parameters for source tables
Parameter
Required
Data type
Default value
Description
canal-json.distributed-tables
No
Boolean
false
If the data of a single table in Canal JSON is stored in multiple partitions, you must configure this parameter.
NoteOnly Realtime Compute for Apache Flink that uses VVR 8.0.11 or later supports this parameter.
ImportantAfter you configure this parameter, you must start the deployment without states.
canal-json.database.include
No
String
None
An optional regular expression that matches the database metadata field in Canal records. Only the changelogs of the specified database are read. The regular expression is compatible with Pattern in Java.
canal-json.table.include
No
String
None
An optional regular expression that matches the table metadata field in Canal records. Only the changelogs of the specified table are read. The regular expression is compatible with Pattern in Java.
canal-json.ignore-parse-errors
No
Boolean
false
Valid values:
true: The current row is skipped if a parsing exception occurs.
false: An error is returned and the deployment fails to start. This is the default value.
canal-json.infer-schema.primitive-as-string
No
Boolean
false
Specifies whether to infer all data types as the STRING type when the table schema is parsed.
Valid values:
true: All basic types are inferred as the STRING type.
false: Data types are inferred based on the data type mappings. This is the default value.
-
Sink table
Parameter
Description
Required
Data type
Default value
Remarks
type
The type of the sink system.
Yes
String
None
Set the value to kafka.
name
The name of the sink.
No
String
None
None
key.format
The format that is used to write the key fields of Kafka messages.
No
String
None
Valid values:
csv
json
topic
The name of the Kafka topic.
No
String
None
If this parameter is specified, all data all be written into this topic.
NoteIf it is not enabled, each piece of data will be written to the string corresponding to its TableID (generated by concatenating with
.
) as a topic, such asdatabaseName.tableName
.partition.strategy
The Kafka partitioning strategy.
No
String
all-to-zero
Valid values:
all-to-zero: Writes all data to partition 0. This is the default value.
hash-by-key: Writes data to partitions based on the hash value of the primary key. This ensures data with the same primary key is in the same partition and in order.
Examples
-
Extract data from Kafka:
source: type: kafka name: Kafka source properties.bootstrap.servers: ${kafka.bootstraps.server} topic: ${kafka.topic} value.format: ${value.format} scan.startup.mode: ${scan.startup.mode} sink: type: hologres name: Hologres sink endpoint: <yourEndpoint> dbname: <yourDbname> username: ${secret_values.ak_id} password: ${secret_values.ak_secret} sink.type-normalize-strategy: BROADEN
-
Ingest data into Kafka:
source: type: mysql name: MySQL Source hostname: ${secret_values.mysql.hostname} port: ${mysql.port} username: ${secret_values.mysql.username} password: ${secret_values.mysql.password} tables: ${mysql.source.table} server-id: 8601-8604 sink: type: kafka name: Kafka Sink properties.bootstrap.servers: ${kafka.bootstraps.server} route: - source-table: ${mysql.source.table} sink-table: ${kafka.topic}
In the route section, specify the name of the target Kafka topic.
By default, ApsaraMQ for Kafka does not enable the automatic topic creation feature. For more information, see Issues related to automatic topic creation. Before writing data into ApsaraMQ for Kafka, you must create the required topics. For more information, see Step 3: Create resources.
Schema inference and change synchronization policies
-
Partition message pre-consumption and table schema initialization
The Kafka connector maintains the schemas of all known tables. Before reading Debezium JSON or Canal JSON format data from Kafka, the Kafka connector attempts to pre-consume up to scan.max.pre.fetch.records messages in each partition and parse the schema of each record. Then, the Kafka connector merges the schemas to initialize the table schema information. Before data consumption, a table creation event is generated based on the initialized schema.
NoteFor Debezium JSON-formatted and Canal JSON-formatted messages, the table information is included in specific messages. The scan.max.pre.fetch.records messages pre-consumed may contain data of multiple tables. Therefore, the number of records pre-consumed for each table cannot be determined. Partition message pre-consumption and table schema initialization are performed only once before the actual consumption and processing of messages for each partition. If subsequent table data exists, the table schema parsed from the first data record of the table is used as the initial table schema. In this case, partition message pre-consumption and table schema initialization will not be performed again.
ImportantOnly Realtime Compute for Apache Flink using VVR 8.0.11 or later supports distributing data in the same table across multiple partitions. In this scenario, you must set the debezium-json.distributed-tables or canal-json.distributed-tables parameter to true.
-
Primary keys
-
For Canal JSON-formatted messages, the primary key of the table is defined based on the pkNames field in JSON.
-
For Debezium JSON-formatted messages, the primary key is not included in JSON. You can manually add a primary key to a table using a rule in the transform module:
transform: - source-table: \.*.\.* projection: \* primary-keys: key1, key2
-
-
Schema inference and schema changes
After the table schema is initialized, if the schema.inference.strategy parameter is set to static, the Kafka connector parses the value fields of each message based on the initial table schema. Schema change events are not generated. If the schema.inference.strategy parameter is set to continuous, the Kafka connector parses the value fields of each Kafka message and infers the physical columns of the message. Then, the Kafka connector compares the inferred schema with the current schema. If the inferred schema is inconsistent with the current schema, the Kafka connector attempts to merge the schemas and generate corresponding table schema change events. The following rules are used to merge schemas:
-
If specific physical columns obtained after parsing are not in the table schema, the Kafka connector automatically adds the columns to the table schema and generates a nullable column event.
-
If specific physical columns obtained after parsing are in the table schema, the columns are retained and the column values are NULL. No column deletion event is generated.
-
If some physical columns obtained after parsing have the same name as some columns in the topic schema, operations are based on your business scenario:
-
If the columns are of the same data type but different precision, the Kafka connector merges the columns of the larger precision and generates a column type change event.
-
If the columns are of different data types, the Kafka connector uses the smallest parent node in the tree structure as the type of the columns with the same name and generates a column type change event. The following figure shows the tree structure.
If the columns are of different data types, the Kafka connector uses the smallest parent node in the tree structure as the type of the columns with the same name and generates a column type change event. The following figure shows the tree structure.
-
-
-
The following schema change policies are supported:
-
Add a column: The connector automatically adds the new column to the end of the schema and synchronizes the data of the new column. The new column is automatically set to a nullable column.
-
Delete a column: The connector automatically fills the nullable column in the destination table with null values instead of generating a column deletion event.
-
Rename a column: The operation of renaming a column involves column addition and column deletion. After a column is renamed in a table, the column with the new name is added to the end of the schema and the column with the original name is filled with null values.
-
Change the data type of a column:
-
If the data type of a column in a downstream sink changes in a YAML deployment for data ingestion, the Kafka connector supports a column type change only if the downstream sink supports the change of the data type in the column. For example, the data type of a column is changed from INT to BIGINT. Whether a downstream sink supports a data type change in a column depends on the column type change rules. Different sink tables support different column type change rules. For more information about the column type change rules supported by a type of sink table, see the documentation for the related sink table.
-
If a downstream sink does not support a column type change, such as Hologres, you can use wide type mapping. When a deployment starts, a table with a wider range of data types is created in the downstream system. When a column type change occurs, the system determines whether the downstream sink can accept the change to support the column type change.
-
-
-
The following schema changes are not supported:
-
Change of constraints, such as the primary key or index.
-
Change from not null to nullable.
-
Usage notes of Semantic.EXACTLY_ONCE
-
When using the transaction mechanism to write data into a Kafka topic, you must configure the isolation.level parameter for all Kafka consumers. Valid values:
-
read_committed: Only committed data is read.
-
read_uncommitted: Data that is not committed can be read. This is the default value.
-
-
A Kafka sink operating in DeliveryGuarantee.EXACTLY_ONCE mode relies on transactions committed after a deployment is recovered from a checkpoint and before the checkpoint is taken. If the duration between the time a job fails and the time the job is restarted exceeds the specified transaction timeout period, data may be lost because Kafka automatically terminates transactions that time out. Therefore, it is recommended to configure a transaction timeout period longer than the estimated job downtime.
By default, the transaction.max.timeout.ms parameter of Kafka brokers is set to 15 minutes. The transaction timeout you configure for a Kafka producer cannot exceed the transaction timeout setting for Kafka brokers. The Kafka sink sets the transaction.timeout.ms parameter in sink configuration to 1 hour by default. Therefore, you must extend the transaction timeout period for Kafka brokers before using the DeliveryGuarantee.EXACTLY_ONCE mode.
-
DeliveryGuarantee.EXACTLY_ONCE mode uses a fixed-size pool for each Kafka sink. One producer from this pool is used per checkpoint. If the number of parallel checkpoints exceeds the pool size, the Kafka producer throws an exception causing the job to fail. You must configure a maximum pool size and the maximum number of concurrent checkpoints accordingly.
-
When a deployment starts, the Kafka sink operating in DeliveryGuarantee.EXACTLY_ONCE mode attempts to terminate transactions that are not committed during checkpointing. These uncommitted transactions may block consumers from reading data. However, in the event of deployment failure before the first checkpoint, the information about previous pool sizes is lost after restarting the deployment. Therefore, it is unsafe to reduce the deployment parallelism before the first checkpoint is taken. If you want to reduce the deployment parallelism before the first checkpoint is taken, ensure that the value obtained by dividing the original parallelism by the new parallelism is greater than or equal to the value of FlinkKafkaProducer.SAFE_SCALE_DOWN_FACTOR.
-
For a Kafka consumer with the isolation.level set to read_committed, any uncommitted transaction (neither terminated nor completed) will block all reads from the Kafka topic. If you perform the following steps:
-
A user creates a transaction to write data to a topic.
-
The user creates another transaction to write data to the topic.
-
The user commits the second transaction.
The Kafka consumer cannot read the data from the second transaction until the first transaction is committed or terminated. Therefore:
-
Data writes into a Kafka topic experience delays, approximately equal to the average checkpointing interval.
-
In the event of deployment failure, the topics written by the deployment block Kafka consumers from reading data until the deployment is restarted or the transactions time out.
-