Kafka Writer writes data to Kafka by using Kafka SDK for Java. This topic describes how Kafka Writer works, the parameters that are supported by Kafka Writer, and how to configure Kafka Writer by using the codeless user interface (UI) and code editor.

Important Kafka Writer supports only exclusive resource groups for Data Integration, but not the shared resource group for Data Integration.

Apache Kafka is a fast, scalable, high-throughput, and distributed messaging system that supports fault tolerance. This system is used to publish and subscribe to messages. Kafka provides built-in partitions, supports data replicas, and can be used to process a large number of messages.

How it works

Kafka Writer writes data to Kafka by using Kafka SDK for Java of the following version:
<dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka-clients</artifactId>
   <version>2.0.0</version>
</dependency>

Parameters

Parameter Description Required
datasource The name of the data source. It must be the same as the name of the added data source. You can add data sources by using the code editor. Yes
server The address of a Kafka broker in your Kafka cluster. Specify the address in the format of IP address:Port number. Yes
topic The name of the Kafka topic to which you want to write data. Topics are categories in which Kafka maintains feeds of messages.

Each message that is published to a Kafka cluster is assigned to a topic. Each topic contains a group of messages.

Yes
valueIndex The sequence number of the column that is obtained from a reader and used as the value in the Kafka topic. If you leave this parameter empty, all columns obtained from the reader are concatenated by using the delimiter specified by the fieldDelimiter parameter to form the value. No
writeMode The write mode. If you leave the valueIndex parameter empty, you can use the writeMode parameter to specify the format in which Kafka Writer concatenates all columns obtained from the reader. The default value is text. Valid values:
  • text: Kafka Writer uses the delimiter specified by the fieldDelimiter parameter to concatenate all columns obtained from the reader.
  • JSON: Kafka Writer concatenates all columns obtained from the reader into a JSON string based on the column names specified by the column parameter.

For example, three columns are obtained from the reader, and the values in the three columns are a, b, and c. If you set the writeMode parameter to text and the fieldDelimiter parameter to a number sign (#), Kafka Writer concatenates the columns into the string a#b#c and writes this string to Kafka. If you set the writeMode parameter to JSON and the column parameter to [{"name":"col1"},{"name":"col2"},{"name":"col3"}], Kafka Writer concatenates the columns into the JSON string {"col1":"a","col2":"b","col3":"c"} and writes this JSON string to Kafka.

If you configure the valueIndex parameter, the writeMode parameter is invalid.

No
column The names of the columns to which you want to write data. Separate the names with commas (,), such as "column":["id","name","age"].

If you leave the valueIndex parameter empty and set the writeMode parameter to JSON, the column parameter determines the names of the columns in the JSON string into which the columns obtained from the reader are concatenated, such as "column":[{"name":id"},{"name":"name"},{"name":"age"}].

  • If the number of columns that are obtained from the reader is greater than the number of columns that are specified in the column parameter, Kafka Writer truncates the columns that are obtained from the reader. Example:

    Three columns are obtained from the reader, and the values in the columns are a, b, and c. If the column parameter is set to [{"name":"col1"},{"name":"col2"}], Kafka Writer concatenates the columns into the JSON string {"col1":"a","col2":"b"} and writes this JSON string to Kafka.

  • If the number of columns that are obtained from the reader is less than the number of columns that are specified in the column parameter, Kafka Writer sets the values of the excess columns in Kafka to null or the string that is specified by the nullValueFormat parameter. Example:

    Two columns are obtained from the reader, and the values in the columns are a and b. If the column parameter is set to [{"name":"col1"},{"name":"col2"},{"name":"col3"}], Kafka Writer concatenates the columns into the JSON string {"col1":"a","col2":"b","col3":null} and writes this JSON string to Kafka.

If you configure the valueIndex parameter or set the writeMode parameter to text, the column parameter is invalid.

Required if valueIndex is not configured and writeMode is set to JSON
partition The ID of the partition to which you want to write data. The value of this parameter must be an integer that is greater than or equal to 0. No
keyIndex The sequence number of the column that is obtained from the reader and used as the key in the Kafka topic.

The value of this parameter must be an integer that is greater than or equal to 0. If you set this parameter to an integer less than 0, an error occurs.

No
keyIndexes The sequence numbers of the columns that are obtained from the reader and used as the key in the Kafka topic.

The sequence numbers must start from 0 and be separated by commas (,), such as [0,1,2]. If you leave this parameter empty, the key in the Kafka topic is null, and data is written to each partition in the Kafka topic in turn. You can configure either the keyIndex or keyIndexes parameter.

No
fieldDelimiter The column delimiter. If you set the writeMode parameter to text and leave the valueIndex parameter empty, Kafka Writer uses the column delimiter you specify for the fieldDelimiter parameter to concatenate all columns that are obtained from the reader to form the value. You can use a single character or multiple characters as the column delimiter. The characters can be Unicode characters (such as \u0001) or escape characters (such as \t or \n). Default value: \t.

If the writeMode parameter is not set to text or the valueIndex parameter is configured, the fieldDelimiter parameter is invalid.

No
keyType The data type of the key in the Kafka topic. Valid values: BYTEARRAY, DOUBLE, FLOAT, INTEGER, LONG, and SHORT. Yes
valueType The data type of the value in the Kafka topic. Valid values: BYTEARRAY, DOUBLE, FLOAT, INTEGER, LONG, and SHORT. Yes
nullKeyFormat If the column specified in the keyIndex or keyIndexes parameter contains the value null, Kafka Writer replaces null with the value of the nullKeyFormat parameter. If you leave the nullKeyFormat parameter empty, Kafka Writer retains the value null. No
nullValueFormat If a column obtained from the reader contains the value null, Kafka Writer replaces null with the value of the nullValueFormat parameter. If you leave the nullValueFormat parameter empty, Kafka Writer retains the value null. No
acks The acknowledgment configuration used when the Kafka producer is initialized. This parameter specifies the method used to confirm that data is written to Kafka. Default value: all. Valid values:
  • 0: A Kafka producer does not acknowledge whether data is written to the destination.
  • 1: A Kafka producer acknowledges that the write operation is successful if data is written to the primary replica.
  • all: A Kafka producer acknowledges that the write operation is successful if data is written to all replicas.
No

Configure Kafka Writer by using the codeless UI

  1. Configure data sources.
    Configure the source and destination for the synchronization node. Configure data sources
    Parameter Description
    Data source The name of the data source to which you want to write data. This parameter is equivalent to the datasource parameter described in the preceding section.
    Topic The name of the Kafka topic to which you want to write data. This parameter is equivalent to the topic parameter that is described in the preceding section.
    Key Columns The sequence numbers of the columns that are obtained from the reader and used as the key in the Kafka topic. This parameter is equivalent to the keyIndexes parameter that is described in the preceding section.
    Write Mode The write mode. This parameter is equivalent to the writeMode parameter that is described in the preceding section.
    Delimiter The column delimiter. This parameter is equivalent to the fieldDelimiter parameter that is described in the preceding section. If you set the Write Mode parameter to text, Kafka Writer uses the specified column delimiter to concatenate all columns that are obtained from the reader to form the value.
    Substitute For Null Key This parameter is equivalent to the nullKeyFormat parameter that is described in the preceding section.
    Substitute For Null Value This parameter is equivalent to the nullValueFormat parameter that is described in the preceding section.
    acks The acknowledgment configuration used when data is written to the Kafka topic. This parameter is equivalent to the acks parameter that is described in the preceding section.
    Batch Config Determines the values of the batch.size and linger.ms parameters used when the Kafka producer is initialized. The default values of the two parameters are 16384 and 10. You can configure this parameter to limit the amount of data to be written at a time.
    Timeout Determines the values of the timeout.ms, request.timeout.ms, and metadata.fetch.timeout.ms parameters used when the Kafka producer is initialized. The default values of the three parameters are 30000, 30000, and 60000. You can configure this parameter to limit the timeout period when the data is written at a time.
  2. Configure field mappings. This operation is equivalent to setting the column parameter that is described in the preceding section. Fields in the source on the left have a one-to-one mapping with fields in the destination on the right.
    • If you set the Write Mode parameter to text, the specified column delimiter is used to concatenate all columns that are obtained from the reader to form the value.
    • If you set the Write Mode parameter to JSON, the names of columns concatenated into the JSON string in the destination are the names of columns that are read from the source. For example, two columns are obtained from the source, and the values in the columns are a and b. If the column names are col1 and col2, Kafka Writer concatenates the columns into the JSON string {"col1":"a","col2":"b"} and writes this JSON string to Kafka.
    Note The names of columns in the destination must contain letters, digits, or underscores (_). Otherwise, Kafka Writer fails to write data.
    Configure field mappings
    Operation Description
    Map Fields with the Same Name Click Map Fields with the Same Name to establish mappings between fields with the same name. The data types of the fields must match.
    Map Fields in the Same Line Click Map Fields in the Same Line to establish mappings between fields in the same row. The data types of the fields must match.
    Delete All Mappings Click Delete All Mappings to remove the mappings that are established.
    Auto Layout Click Auto Layout. Then, the system automatically sorts the fields based on specific rules.
  3. Configure channel control policies. Channel control
    Parameter Description
    Expected Maximum Concurrency The maximum number of parallel threads that the synchronization node uses to read data from the source or write data to the destination. You can configure the parallelism for the synchronization node on the codeless UI.
    Bandwidth Throttling Specifies whether to enable throttling. You can enable throttling and specify a maximum transmission rate to prevent heavy read workloads on the source. We recommend that you enable throttling and set the maximum transmission rate to an appropriate value based on the configurations of the source.
    Dirty Data Records Allowed The maximum number of dirty data records allowed.
    Distributed Execution

    The distributed execution mode that allows you to split your node into pieces and distribute them to multiple Elastic Compute Service (ECS) instances for parallel execution. This speeds up synchronization. If you use a large number of parallel threads to run your synchronization node in distributed execution mode, excessive access requests are sent to the data sources. Therefore, before you use the distributed execution mode, you must evaluate the access load on the data sources. You can enable this mode only if you use an exclusive resource group for Data Integration. For more information about exclusive resource groups for Data Integration, see Exclusive resource groups for Data Integration and Create and use an exclusive resource group for Data Integration.

Configure Kafka Writer by using the code editor

For more information about how to configure a synchronization node by using the code editor, see Configure a batch synchronization node by using the code editor.

In the following code, a synchronization node is configured to write data to Kafka.
{
    "type":"job",
"version":"2.0",// The version number. 
    "steps":[
        {
            "stepType":"stream",
            "parameter":{},
            "name":"Reader",
            "category":"reader"
        },
        {
"stepType":"Kafka",// The writer type. 
            "parameter":{
"server": "ip:9092", // The address of a Kafka broker. 
"keyIndex": 0, // The sequence number of the column that is used as the key. You must use the lower camel case for the column name.
"valueIndex": 1, // The sequence number of the column that is used as the value. You can specify only one column. If you leave this parameter empty, all columns obtained from the reader are used as the value.
        // If you want to use the second, third, and fourth columns in a MaxCompute table as the value, cleanse and integrate the data in the table. Create a MaxCompute table, write the processed data to the new table, and then use the new table to synchronize data. 
"keyType": "Integer", // The data type of the key in the Kafka topic. 
"valueType": "Short", // The data type of the value in the Kafka topic. 
"topic": "t08", // The name of the Kafka topic to which you want to write data. 
"batchSize": 1024 // The number of data records to write at a time. 
            },
            "name":"Writer",
            "category":"writer"
        }
    ],
    "setting":{
        "errorLimit":{
"record":"0"// The maximum number of dirty data records allowed. 
        },
        "speed":{
                     "throttle":true,// Specifies whether to enable bandwidth throttling. The value false indicates that bandwidth throttling is disabled, and the value true indicates that bandwidth throttling is enabled. The mbps parameter takes effect only when the throttle parameter is set to true. 
                     "concurrent":1, // The maximum number of parallel threads. 
                     "mbps":"12" // The maximum transmission rate.
        }
    },
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    }
}

Use SASL authentication

If you want to use SASL or SSL for authentication, configure the SASL or SSL authentication mode when you configure a Kafka data source. For more information, see Add a Kafka data source.