edit-icon download-icon

Configure DataHub Writer

Last Updated: Nov 26, 2018

DataHub is a real-time data distribution platform designed to process streaming data. It offers features such as publish, subscribe, and distribute streaming data. It helps to easily analyze and create applications based on streaming data.

Based on Alibaba Cloud’s Apsara platform, DataHub delivers high availability, low latency, high scalability, and high throughput. Seamlessly connected to Alibaba Cloud’s stream computing engine, StreamCompute, DataHub allows you to use SQL to analyze streaming data. It can also distribute streaming data to various cloud products, such as MaxCompute (formerly known as ODPS) and OSS.

Note:

String supports UTF-8 encoding only, and the maximum length of a single string column is 1 MB.

Parameter configuration

Source and sink are connected by channel, so the channel on the writer must be consistent with that on the reader in type. The channel type can be memory-channel and file-channel. The following configuration is for file-channel.

  1. "agent.sinks.dataXSinkWrapper.channel": "file"

Parameter description

  • accessId

    • Description: AccessID of DataHub

    • Required: Yes

    • Default value: None

  • accessKey

    • Description: AccessKey of DataHub.

    • Required: Yes

    • Default value: None

  • endpoint

    • Description: Endpoint of DataHub. For requests to access DataHub resources, select the correct domain name based on the service to which the resource belongs.

    • Required: Yes

    • Default value: None

  • maxRetryCount

    • Description: The maximum number of retries for task failure.

    • Required: No

    • Default value: None

  • mode

    • Description: The write mode when the value type is string.

    • Required: Yes

    • Default value: None

  • parseContent

    • Description: parsing the content.

    • Required: Yes

    • Default value: None

  • project

    • Description: Project is the basic unit of DataHub data, which contains multiple topics.

      Note:

      The projects of DataHub and MaxCompute are independent from each other. The project you create in MaxCompute cannot be reused in DataHub, and you must create a new one in DataHub.

    • Required: Yes

    • Default value: None

  • topic

    • Description: Topic is the smallest unit of DataHub subscription and publication. You can use Topic to represent a class or a kind of streaming data.

    • Required: Yes

    • Default value: None

  • maxCommitSize

    • Description: To improve writing efficiency, DataX-On-Flume collects the buffer data and submits it to the target end in batches when the collected data size reaches maxCommitSize (in MB). The maxCommitSize is 1048576 (1 MB) by default.

    • Required: No

    • Default value: 1 MB

  • batchSize

    • Description: To improve writing efficiency, DataX-On-Flume collects the buffer data and submits it to the target end in batches when the number of collected data entries reaches batchSize (in entry). The batchSize is 1,024 (1,024 entries) by default.

    • Required: No

    • Default value: 1024

  • maxCommitInterval

    • Description: To improve writing efficiency, DataX-On-Flume collects the buffer data and submits it to the target end in batches when the number of collected data entries reaches the limit of maxCommitSize and batchSize. If the data collection source does not produce data for a long time, to ascertain the timely delivery of data, the maxCommitInterval parameter (the maximum time allowed for the buffer data preservation, beyond which the data is compulsively delivered) (in milliseconds) is increased. The maxCommitInterval is 30,000 (30 seconds) by default.

    • Required: No

    • Default value: 30

  • parseMode

    • Description: Log parsing mode, including non-parsing default mode and csv mode. In the non-parsing mode, one collected log line is written directly as a column of DataX Record. CSV mode supports configuring one column separator which separates one log line into multiple columns of DataX Record.

    • Required: No

    • Default value: default

Development in script mode

Configure a synchronization job to read data from memory:

  1. {
  2. "configuration": {
  3. "reader": {},
  4. "writer": {
  5. "parameter": {
  6. "agent.sinks": "dataXSinkWrapper",//Sink name
  7. "agent.sinks.dataXSinkWrapper.channel": "memoryChannel",//Memory channel type
  8. "agent.sinks.dataXSinkWrapper.type":"",//Type path
  9. "accessId": "",
  10. "accessKey": "",//AK information
  11. "endpoint": "",//DataHub endpoint
  12. "maxRetryCount": 500,//Number of retries
  13. "project": "",//Project information created in the Datahub
  14. "topic": ""//Topic information created in Datahub
  15. },
  16. "plugin": "datahubwriter"
  17. }
  18. },
  19. "type": "stream",
  20. "version": "1.0"
  21. }

Tip: The rReal-time Log feature is still in beta testing. Open a ticket to our Data Integration team for trial use.

Thank you! We've received your feedback.