Use Kafka as a source for real-time synchronization to continuously read data from a message queue and write it to a destination. This topic shows you how to configure the Kafka Reader component.
Prerequisites
-
Supports Alibaba Cloud Kafka and self-managed Kafka versions from 0.10.2 to 2.2.x, inclusive.
-
Using Kafka versions earlier than 0.10.2 is not recommended. They do not support retrieving partition offsets and may lack timestamps in their data structure. This causes inaccurate latency statistics for the synchronization task and prevents proper offset resets.
For details on how to configure a Kafka data source, see Configure a Kafka data source.
Procedure
Log on to the DataWorks console. In the target region, click in the left-side navigation pane. Select a workspace from the drop-down list and click Go to Data Development.
-
Hover over the
icon and choose . Alternatively, expand a scheduled workflow, right-click it, and choose .
-
In the Create Node dialog box, set Synchronization Method to Single Table (Topic) to Single Table (Topic) ETL, enter a Name, and select a Path.
-
Click Confirm.
-
On the configuration page of the real-time synchronization node, choose and drag it to the editing canvas.
-
Click the Kafka node and configure its parameters in the Node Configuration dialog box.
Parameter
Description
Data Source
Select a configured Kafka data source. Only Kafka data sources are supported. If no data source is configured, click New data source on the right to go to the page to create one. For more information, see Configure a Kafka data source.
Topic
The name of the topic from which to read messages.
A topic is a category used to group related messages within a Kafka cluster.
NoteA Kafka Reader can read from only one topic.
Key Type
The type of the Kafka key determines the key.deserializer configuration for initializing a KafkaConsumer. The available values include STRING, BYTEARRAY, DOUBLE, FLOAT, INTEGER, LONG, and SHORT.
Value Type
When you initialize a KafkaConsumer, the type of the Kafka Value determines the value.deserializer configuration. The valid values include STRING, BYTEARRAY, DOUBLE, FLOAT, INTEGER, LONG, and SHORT.
Output Mode
Defines how to parse Kafka records.
-
Single-row output: Parses a Kafka record as an unstructured string or a JSON object. One Kafka record generates one output record.
-
Multi-row output: Parses a Kafka record as a JSON array. Each element in the array generates one output record. Therefore, one Kafka record can generate multiple output records.
NoteThis parameter is available only in some regions. If you do not see it, wait for the feature to be released in your region.
Path of Array
When the output mode is set to multi-line output, specify the path to the JSON array in the value of the Kafka record. You can use the
a.a1format to reference a field in a specific JSON object, or thea[0].a1format to reference a field in a specific JSON array. If this parameter is empty, the entire value of the Kafka record is parsed as a JSON array.The target JSON array must be an array of objects, such as
[{"a":"hello"},{"b":"world"}]. It cannot be an array of primitives like numbers or strings, such as["a","b"].Configuration parameters
When creating a Kafka consumer client (KafkaConsumer), you can add extended parameters to
kafkaConfigto customize its data reading behavior. For a complete list of parameters supported by each Kafka cluster version, see the official Kafka documentation for your version.-
Common parameters:
-
bootstrap.servers -
auto.commit.interval.ms -
session.timeout.ms
-
-
group.id-
Default behavior
By default, a real-time synchronization task assigns a randomly generated string as thegroup.idfor theKafkaConsumer. -
Manual configuration
You can manually specify a fixedgroup.id. This allows you to monitor or observe the task's consumption offset in the Kafka cluster by using the specified consumer group.
-
Output Field
Define the fields to extract from each Kafka message.
-
Click Add more fields, enter a Field, and select a Value to add a custom field.
Assignment Method specifies how to extract a field's value from a Kafka record. Click the
icon on the right to switch between two value extraction methods.-
Preset methods: Provides six built-in methods to extract values from a Kafka record:
-
value: The message body.
-
key: The message key.
-
partition: The partition number.
-
offset: The message offset.
-
timestamp: The message timestamp in milliseconds.
-
headers: The message headers.
-
-
JSON parsing: Use dot notation (.) to access sub-fields and bracket notation ([]) to access array elements to extract values from complex JSON. For backward compatibility, you can use legacy syntax with double underscores (e.g., __value__) to extract metadata or the entire message body from a Kafka record. The following code block shows an example of Kafka data.
{ "a": { "a1": "hello" }, "b": "world", "c":[ "xxxxxxx", "yyyyyyy" ], "d":[ { "AA":"this", "BB":"is_data" }, { "AA":"that", "BB":"is_also_data" } ] }-
The output field values for different scenarios are as follows:
-
To synchronize the value of a Kafka record, enter value for the value retrieval method.
-
If you synchronize the Kafka record key, enter key for the Value Retrieval Method.
-
If you are synchronizing the partition of a Kafka record, set the value source to partition.
-
If you synchronize Kafka record offsets, enter offset for Value Retrieval Method.
-
If you synchronize the Kafka record timestamp, enter timestamp for the value retrieval method.
-
If you synchronize Kafka record headers, enter headers as the value source.
-
To synchronize the data "hello" from a1, enter a.a1 as the path.
-
If you want to synchronize the data "world" from b, set the value retrieval method to b.
-
If you synchronize the data "yyyyyyy" of c, enter c[1] as the value retrieval method.
-
If you want to synchronize the data "this" from AA, enter d[0].AA as the value retrieval method.
-
-
-
-
To delete a field, hover over it and click the
icon that appears.
Example scenario: If you select Multi-row Output for Output Mode, the system first parses a JSON array based on the specified JSON path. Then, it extracts each JSON object from the array and constructs the output fields based on the defined field names and value extraction methods. The value extraction method is the same as that for the Single-line Output mode. You can use the . syntax (to retrieve sub-fields) and the [] syntax (to retrieve array elements) to get content from a complex JSON format. The Kafka instance data is as follows:
{ "c": { "c0": [ { "AA": "this", "BB": "is_data" }, { "AA": "that", "BB": "is_also_data" } ] } }When the array location is set to
c.c0, and you define two output fields, one namedAAwith its value fromAAand another namedBBwith its value fromBB, the Kafka record is parsed into two records (one withAA=thisandBB=is_data, and the other withAA=thatandBB=is_also_data). -
-
Click the
icon on the toolbar.NoteA Kafka Reader can read from only one topic.