This topic describes the parameters that are supported by DataHub Writer and how to configure DataHub Writer by using the codeless user interface (UI) and code editor.

DataHub is a real-time data distribution platform that is designed to process streaming data. You can publish and subscribe to streaming data in DataHub and distribute the data to other platforms. This allows you to analyze streaming data and build applications based on the streaming data.

DataHub is built on top of the Apsara distributed operating system, and features high availability, low latency, high scalability, and high throughput. DataHub is seamlessly integrated with Realtime Compute for Apache Flink, and allows you to use SQL statements to analyze streaming data. DataHub can also distribute streaming data to Alibaba Cloud services, such as MaxCompute and Object Storage Service (OSS).
Important Strings must be encoded in the UTF-8 format. The size of each string must not exceed 1 MB.

Channel types

The source is connected to the sink by using a single channel. Therefore, the channel type configured for the writer must be the same as that configured for the reader. In normal cases, channels are categorized into two types: memory and file. In the following configuration, the channel type is set to file:
"agent.sinks.dataXSinkWrapper.channel": "file"

Parameters

ParameterDescriptionRequiredDefault value
accessIdThe AccessKey ID of the account that you use to connect to DataHub. YesNo default value
accessKeyThe AccessKey secret of the account that you use to connect to DataHub. YesNo default value
endPointThe endpoint of DataHub. YesNo default value
maxRetryCountThe maximum number of retries if the synchronization node fails. NoNo default value
modeThe mode for writing strings. YesNo default value
parseContentThe data to be parsed. YesNo default value
projectThe basic organizational unit of data in DataHub. Each project has one or more topics.
Note DataHub projects are independent of MaxCompute projects. You cannot use MaxCompute projects as DataHub projects.
YesNo default value
topicThe minimum unit for data subscription and publishing. You can use topics to distinguish different types of streaming data. YesNo default value
maxCommitSize The maximum amount of the buffered data that Data Integration can accumulate before it commits the data to the destination. You can specify this parameter to improve writing efficiency. The default value is 1048576, in bytes, which is 1 MB. DataHub allows for a maximum of 10,000 data records to be written in a single request. If the number of data records exceeds 10,000, the synchronization node fails. In this case, the maximum amount of data that can be written in a single request is calculated by using the following formula: Average amount of data in a single data record × 10,000. You need to set maxCommitSize to a value less than the maximum amount of data calculated. This ensures that the number of data records to be written in a single request does not exceed 10,000. No1Byte

Configure DataHub Writer by using the codeless UI

  1. Configure data sources.
    Configure Source and Target for the synchronization node. Connections section
    ParameterDescription
    ConnectionThe name of the data source to which you want to write data.
    TopicThis parameter is equivalent to the topic parameter that is described in the preceding section.
    maxCommitSizeThe maximum amount of the data that is written to DataHub in a single request. Unit: bytes.
    maxRetryCountThis parameter is equivalent to the maxRetryCount parameter that is described in the preceding section.
  2. Configure field mappings. This operation is equivalent to setting the column parameter that is described in the preceding section. Fields in the source on the left have a one-to-one mapping with fields in the destination on the right. Field mappings
    OperationDescription
    Map Fields with the Same NameClick Map Fields with the Same Name to establish mappings between fields with the same name. The data types of the fields must match.
    Map Fields in the Same LineClick Map Fields in the Same Line to establish mappings between fields in the same row. The data types of the fields must match.
    Delete All MappingsClick Delete All Mappings to remove the mappings that are established.
    Auto LayoutClick Auto Layout. Then, the system automatically sorts the fields based on specific rules.
  3. Configure channel control policies. Channel control
    ParameterDescription
    Expected Maximum ConcurrencyThe maximum number of parallel threads that the synchronization node uses to read data from the source or write data to the destination. You can configure the parallelism for the synchronization node on the codeless UI.
    Bandwidth ThrottlingSpecifies whether to enable throttling. You can enable throttling and specify a maximum transmission rate to prevent heavy read workloads on the source. We recommend that you enable throttling and set the maximum transmission rate to an appropriate value based on the configurations of the source.
    Dirty Data Records AllowedThe maximum number of dirty data records allowed.
    Distributed Execution

    The distributed execution mode that allows you to split your node into pieces and distribute them to multiple Elastic Compute Service (ECS) instances for parallel execution. This speeds up synchronization. If you use a large number of parallel threads to run your synchronization node in distributed execution mode, excessive access requests are sent to the data sources. Therefore, before you use the distributed execution mode, you must evaluate the access load on the data sources. You can enable this mode only if you use an exclusive resource group for Data Integration. For more information about exclusive resource groups for Data Integration, see Exclusive resource groups for Data Integration and Create and use an exclusive resource group for Data Integration.

Configure DataHub Writer by using the code editor

In the following code, a synchronization node is configured to write data from memory to DataHub by using the code editor. For more information, see Configure a batch synchronization node by using the code editor.
{
    "type": "job",
    "version": "2.0",// The version number. 
    "steps": [
        { 
            "stepType": "stream",
            "parameter": {},
            "name": "Reader",
            "category": "reader"
        },
        {
            "stepType": "datahub",// The writer type. 
            "parameter": {
                "datasource": "",// The name of the data source to which you want to write data. 
                "topic": "",// The minimum unit for data subscription and publishing. You can use topics to distinguish different types of streaming data. 
                "maxRetryCount": 500,// The maximum number of retries if the synchronization node fails. 
                "maxCommitSize": 1048576// The maximum amount of the buffered data that Data Integration can accumulate before it commits the data to the destination. Unit: bytes. 
                 // DataHub allows for a maximum of 10,000 data records to be written in a single request. If the number of data records exceeds 10,000, the synchronization node fails. In this case, the maximum amount of data that can be written in a single request is calculated by using the following formula: Average amount of data in a single data record × 10,000. You need to set maxCommitSize to a value less than the maximum amount of data calculated. This ensures that the number of data records to be written in a single request does not exceed 10,000. For example, if the data size of a single data record is 10 KB, the value of this parameter must be less than the result of 10 multiplied by 10,000. 
            },
            "name": "Writer",
            "category": "writer"
        }
    ],
    "setting": {
        "errorLimit": {
            "record": ""// The maximum number of dirty data records allowed. 
        },
        "speed": {
            "throttle":true,// Specifies whether to enable throttling. The value false indicates that throttling is disabled, and the value true indicates that throttling is enabled. The mbps parameter takes effect only when the throttle parameter is set to true. 
            "concurrent":20, // The maximum number of parallel threads. 
            "mbps":"12"// The maximum transmission rate.
        }
    },
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    }
}

FAQ

What do I do if data fails to be written to DataHub because the amount of data that I want to write to DataHub at a time exceeds the upper limit?