Alibaba Cloud DataHub is a streaming data processing platform. You can publish and subscribe to streaming data in DataHub and distribute the data to other platforms. This allows you to analyze streaming data and build applications based on the streaming data.

DataHub Reader reads data from DataHub by using the following version of SDK for Java:
<dependency>
    <groupId>com.aliyun.DataHub</groupId>
    <artifactId>aliyun-sdk-DataHub</artifactId>
    <version>2.9.1</version>
</dependency>

Parameters

Parameter Description Required
endpoint The endpoint of DataHub. Yes
accessId The AccessKey ID that is used to connect to DataHub. Yes
accessKey The AccessKey secret that is used to connect to DataHub. Yes
project The name of the DataHub project from which you want to read data. DataHub projects are the resource management units in DataHub for resource isolation and control. Yes
topic The name of the DataHub topic from which you want to read data. Yes
batchSize The number of data records to read at a time. Default value: 1024. No
beginDateTime The start time of data consumption. This parameter specifies the left boundary of a left-closed, right-open interval. Specify the start time in the format of yyyyMMddHHmmss. The parameter can be used together with the scheduling time parameters in DataWorks.
Note The beginDateTime and endDateTime parameters must be used in pairs.
Yes
endDateTime The end time of data consumption. This parameter specifies the right boundary of a left-closed, right-open interval. Specify the end time in the format of yyyyMMddHHmmss. The parameter can be used together with the scheduling time parameters in DataWorks.
Note The beginDateTime and endDateTime parameters must be used in pairs.
Yes

Configure DataHub Reader by using the codeless UI

  1. Configure data sources.
    Configure Source and Target for the synchronization node. Configure data sources
    Parameter Description
    Connection The name of the data source from which you want to read data.
    Topic This parameter is equivalent to the topic parameter that is described in the preceding section.
    consumeBeginDateTime This parameter is equivalent to the beginDateTime parameter that is described in the preceding section.
    consumeEndDateTime This parameter is equivalent to the endDateTime parameter that is described in the preceding section.
    Number of batches This parameter is equivalent to the batchSize parameter that is described in the preceding section.
  2. Configure field mappings. This operation is equivalent to setting the column parameter that is described in the preceding section.
    Fields in the source on the left have a one-to-one mapping with fields in the destination on the right. You can click Add to add a field. To remove an added field, move the pointer over the field and click the Remove icon. Field mappings
    Operation Description
    Map Fields with the Same Name Click Map Fields with the Same Name to establish mappings between fields with the same name. The data types of the fields must match.
    Map Fields in the Same Line Click Map Fields in the Same Line to establish mappings between fields in the same row. The data types of the fields must match.
    Delete All Mappings Click Delete All Mappings to remove the mappings that are established.
    Auto Layout Click Auto Layout. Then, the system automatically sorts the fields based on specific rules.
    Change Fields Click the Change Fields icon. In the Change Fields dialog box, you can manually edit the fields in the source table. Each field occupies a row. The first and the last blank rows are included, whereas other blank rows are ignored.
    Add Click Add to add a field. Take note of the following rules when you add a field:
    • You can enter constants. Each constant must be enclosed in single quotation marks ('), such as 'abc' and '123'.
    • You can use scheduling parameters, such as ${bizdate}.
    • You can enter functions that are supported by relational databases, such as now() and count(1).
    • If the field that you entered cannot be parsed, the value of Type for the field is Unidentified.
  3. Configure channel control policies. Channel control
    Parameter Description
    Expected Maximum Concurrency The maximum number of parallel threads that the synchronization node uses to read data from the source or write data to the destination. You can configure the parallelism for the synchronization node on the codeless UI.
    Bandwidth Throttling Specifies whether to enable bandwidth throttling. You can enable bandwidth throttling and specify a maximum transmission rate to prevent heavy read workloads on the source. We recommend that you enable bandwidth throttling and set the maximum transmission rate to an appropriate value based on the configurations of the source.
    Dirty Data Records Allowed The maximum number of dirty data records allowed.
    Distributed Execution

    The distributed execution mode that allows you to split your node into pieces and distribute them to multiple Elastic Compute Service (ECS) instances for parallel execution. This speeds up synchronization. If you use a large number of parallel threads to run your synchronization node in distributed execution mode, excessive access requests are sent to the data sources. Therefore, before you use the distributed execution mode, you must evaluate the access load on the data sources. You can enable this mode only if you use an exclusive resource group for Data Integration. For more information about exclusive resource groups for Data Integration, see Exclusive resource groups for Data Integration and Create and use an exclusive resource group for Data Integration.

Configure DataHub Reader by using the code editor

In the following code, a synchronization node is configured to read data from DataHub. For more information about how to configure a synchronization node by using the code editor, see Create a synchronization node by using the code editor.
{
    "job": {
         "content": [
            {
                "reader": {
                    "name": "DataHubreader",
                    "parameter": {
                        "endpoint": "xxx" // The endpoint of DataHub. 
                        "accessId": "xxx", // The AccessKey ID that is used to connect to DataHub. 
                        "accessKey": "xxx", // The AccessKey secret that is used to connect to DataHub. 
                        "project": "xxx", // The name of the DataHub project from which you want to read data. 
                        "topic": "xxx" // The name of the DataHub topic from which you want to read data. 
                        "batchSize": 1000, // The number of data records to read at a time. 
                        "beginDateTime": "20180910111214", // The start time of data consumption. 
                        "endDateTime": "20180910111614", // The end time of data consumption. 
                        "column": [
                            "col0",
                            "col1",
                            "col2",
                            "col3",
                            "col4"
                        ]
                    }
                },
                "writer": {
                    "name": "streamwriter",
                    "parameter": {
                        "print": false
                    }
                }
            }
        ]
    }
}