All Products
Search
Document Center

DataHub:Java high-level SDK

Last Updated:Oct 27, 2025

Overview

The Java High-Level SDK, commonly referred to as client-library, is primarily divided into Producer and Consumer components. This document introduces the related parameters and common usage patterns for both components.

Authentication

An AccessKey pair is provided by Alibaba Cloud and used to complete identity authentication when you call API operations. Each AccessKey pair consists of an AccessKey ID and an AccessKey secret. You must keep the AccessKey pair confidential. If your AccessKey pair is leaked, the security of all resources in your account is threatened. When accessing Alibaba Cloud OpenAPI, hardcoding the AccessKey pair in your code may lead to AccessKey leakage due to improper repository permission management.

Alibaba Cloud Credentials is an identity credential management tool provided by Alibaba Cloud for developers. After configuring the Credentials default credential chain, you do not need to hardcode the AccessKey pair in your code when accessing Alibaba Cloud OpenAPI, which can effectively ensure the security of cloud resources in your account.

Prerequisites

Configuration options

This document uses environment variables to obtain AccessKey information. For more options, see Manage access credentials

Important

When using the configuration file option, make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET do not exist in your system. Otherwise, the configuration file will not take effect.

Alibaba Cloud SDK supports creating default access credentials by defining the ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variables. When you call an API operation, the system reads the AccessKey pair from the default credential and uses the AccessKey pair to complete authentication.

Configuration methods

Configure the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET.

Linux and macOS configuration method

Execute the following commands:

export ALIBABA_CLOUD_ACCESS_KEY_ID=<access_key_id>
export ALIBABA_CLOUD_ACCESS_KEY_SECRET=<access_key_secret>

Replace <access_key_id> with your prepared AccessKey ID and <access_key_secret> with your AccessKey Secret.

Windows configuration method

  1. Create an environment variable file and add the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET, then write your prepared AccessKey ID and AccessKey Secret.

  2. Restart Windows.

Code example

EnvironmentVariableCredentialProvider provider = EnvironmentVariableCredentialProvider.create();

Producer overview

Limits

  • Producer is thread-safe. Theoretically, only one Producer is needed for a topic within the same process.

Parameter description

Producer parameters are set through ProducerConfig. For example, to set the maxAsyncThreadNum parameter, you need to call the setMaxAsyncThreadNum method of ProducerConfig.

Parameter name

Type

Required

Default value

Description

maxAsyncThreadNum

16

Thread pool size for sending data

userAgent

String

No

dcl-xxx

maxRetryCount

int

No

1

Maximum retries

maxRetryIntervalMs

int

No

1000

Retry interval for retriable errors, excluding throttling errors

maxRetryIntervalMsForLimit

int

No

100

Retry interval after write throttling

ProducerInterceptor

Object

No

-

Interceptor for additional processing when writing data, such as adding extra attribute information

HttpConfig

Object

No

-

HTTP-related default configurations are numerous, it's recommended to check the code directly

maxAsyncBufferRecords

int

No

INT_MAX

Maximum number of records for batch sending in async mode, usually controlled by size, so the default value is INT_MAX

maxAsyncBufferTimeMs

long

No

10000

Maximum cache time for async sending

maxAsyncBufferSize

long

No

4 * 1024 * 1024

Maximum batch size for async sending

maxAsyncQueueSize

long

No

16

Number of requests being sent after batching in async mode, exceeding this will block the send interface, mainly to prevent out-of-memory

useTTFormat

enableHeartbeat

bool

No

false

Whether to send heartbeat packets, generally not needed

heartbeatGenerator

Object

No

DefaultBlobHeartbeatGenerator

If heartbeat sending is enabled, the user-set heartbeatGenerator will be used first if configured, otherwise DefaultBlobHeartbeatGenerator will be used by default

Producer examples

Dependencies

<!-- Zero trust credential related -->
<dependency>
	<groupId>com.aliyun</groupId>
	<artifactId>credentials-java</artifactId>
	<version>1.0.2</version>
</dependency>

<dependency>
	<groupId>com.aliyun.datahub</groupId>
	<artifactId>datahub-client-library</artifactId>
	<version>1.4.11</version>
</dependency>

Asynchronous writing (recommended)

The advantage of asynchronous writing is that you don't need to batch data yourself, and the batching method can be configured through parameters. You can refer to the parameter description above for optimization.

public static void main(String[] args) throws InterruptedException {
	// Get AccessKey information through environment variables
	EnvironmentVariableCredentialProvider provider = EnvironmentVariableCredentialProvider.create();

	String endpoint ="https://dh-cn-hangzhou.aliyuncs.com";
	String projectName = "test_project";
	String topicName = "test_topic";

	// Initialize Producer with default configuration
	ProducerConfig config = new ProducerConfig(endpoint, provider);
	DatahubProducer producer = new DatahubProducer(projectName, topicName, config);

	RecordSchema schema = producer.getTopicSchema();
	// If multi-version schema is enabled, you can also get the schema of a specific version
	// RecordSchema schema = producer.getTopicSchema(3);

	// For asynchronous writing, you can register a callback function as needed
	WriteCallback callback = new WriteCallback() {
		@Override
		public void onSuccess(String shardId, List<RecordEntry> records, long elapsedTimeMs, long sendTimeMs) {
			System.out.println("write success");
		}

		@Override
		public void onFailure(String shardId, List<RecordEntry> records, long elapsedTimeMs, DatahubClientException e) {
			System.out.println("write failed");
		}
	};

	for (int i = 0; i < 10000; ++i) {
		try {
            // generate data by schema
            TupleRecordData data = new TupleRecordData(schema);
            data.setField("field1", "hello");
            data.setField("field2", 1234);
            RecordEntry recordEntry = new RecordEntry();
            recordEntry.setRecordData(data);

            producer.sendAsync(recordEntry, callback);
            // If you don't need to know whether the data is sent successfully, you don't need to register a callback
            // producer.sendAsync(recordEntry, null);
        } catch (DatahubClientException e) {
            // TODO Handle exceptions, usually non-retriable errors or exceeded retry count;
            Thread.sleep(1000);
        }
	}

	// Ensure all data is sent before exiting
	producer.flush(true);
	producer.close();
}

Hash writing

If data needs to be ordered, you need to hash based on certain information. Data with the same hash value will be written to the same shard. Data within a single shard can guarantee order. We recommend implementing hash writing asynchronously.

public static void main(String[] args) throws InterruptedException {
    // Get AccessKey information through environment variables
    EnvironmentVariableCredentialProvider provider = EnvironmentVariableCredentialProvider.create();

    String endpoint = "https://dh-cn-hangzhou.aliyuncs.com";
    String projectName = "test_project";
    String topicName = "test_topic";

    // Initialize Producer with default configuration
    ProducerConfig config = new ProducerConfig(endpoint, provider);
    DatahubProducer producer = new DatahubProducer(projectName, topicName, config);


    RecordSchema schema = producer.getTopicSchema();
    // If multi-version schema is enabled, you can also get the schema of a specific version
    // RecordSchema schema = producer.getTopicSchema(3);

    // For asynchronous writing, you can register a callback function
    WriteCallback callback = new WriteCallback() {
        @Override
        public void onSuccess(String shardId, List<RecordEntry> records, long elapsedTimeMs, long sendTimeMs) {
            System.out.println("write success");
        }

        @Override
        public void onFailure(String shardId, List<RecordEntry> records, long elapsedTimeMs, DatahubClientException e) {
            System.out.println("write failed");
        }
    };

    for (int i = 0; i < 10000; ++i) {
        try {
            // generate data by schema
            TupleRecordData data = new TupleRecordData(schema);
            data.setField("field1", "hello");
            data.setField("field2", 1234);
            RecordEntry recordEntry = new RecordEntry();
            recordEntry.setRecordData(data);
            // Set hash content for each record
            recordEntry.setHashKey("test" + i);

            producer.sendAsync(recordEntry, callback, DefaultRecordPartitioner.INSTANCE);
            // If you don't need to know whether the data is sent successfully, you don't need to register a callback
            // producer.sendAsync(recordEntry, null, DefaultRecordPartitioner.INSTANCE);
        } catch (DatahubClientException e) {
            // TODO Handle exceptions, usually non-retriable errors or exceeded retry count;
            Thread.sleep(1000);
        }
    }

    // Ensure all data is sent before exiting
    producer.flush(true);
    producer.close();
}

Synchronous writing

If you want to control the batching method yourself, you can use synchronous writing.

public static void main(String[] args) throws InterruptedException {
    // Get AccessKey information through environment variables
    EnvironmentVariableCredentialProvider provider = EnvironmentVariableCredentialProvider.create();

    String endpoint = "https://dh-cn-hangzhou.aliyuncs.com";
    String projectName = "test_project";
    String topicName = "test_topic";

    // Initialize Producer with default configuration
    ProducerConfig config = new ProducerConfig(endpoint, provider);
    DatahubProducer producer = new DatahubProducer(projectName, topicName, config);


    RecordSchema schema = producer.getTopicSchema();
    // If multi-version schema is enabled, you can also get the schema of a specific version
    // RecordSchema schema = producer.getTopicSchema(3);

    List<RecordEntry> recordEntryList = new ArrayList<>();
    for (int i = 0; i < 1000; ++i) {
        // generate data by schema
        TupleRecordData data = new TupleRecordData(schema);
        data.setField("field1", "hello");
        data.setField("field2", 1234);
        RecordEntry recordEntry = new RecordEntry();
        recordEntry.setRecordData(data);
        recordEntryList.add(recordEntry);
    }

    // Write failure will throw an exception, usually non-retriable errors or retriable errors that exceed retry count
    try {
        String shardId = producer.send(recordEntryList);
        System.out.println("write success, shardId: " + shardId);
    } catch (DatahubClientException e) {
        // TODO Handle exceptions, usually non-retriable errors or exceeded retry count;
    }

    producer.close();
}

Consumer overview

Consumer is used for data consumption and can automatically allocate shards, generally called collaborative consumption. For more information, see Collaborative consumption.

Consumer actually reads data in batches, caches it locally, and then returns data one by one at the interface level.

Checkpoint maintenance

Consumer can automatically maintain checkpoint information. When starting, it will automatically retrieve the checkpoint saved on the server and continue consuming from the last saved checkpoint. During consumption, it will periodically (default is 10 seconds) submit the client's data checkpoint to the server. The specific implementation logic is as follows:

Each data point corresponds to a RecordKey object. After consuming a piece of data, you can perform an ack operation on the RecordKey. After ack, it indicates that this data has been consumed and the checkpoint can be updated. You can also choose automatic ack. After the client reads the data, it will maintain each data's corresponding RecordKey in a queue in order. There is a periodic task in the background that submits checkpoints to the server. Each time it checks the queue, if the RecordKey at the front of the queue has been acked, it will be popped from the queue, until the RecordKey at the front of the queue has not been acked. Then the checkpoint of the previous position of the current front RecordKey is the checkpoint that needs to be submitted to the server this time.

FAQ

1. If the client consumes a piece of data, but the checkpoint is not submitted to the server in time, will this data be consumed repeatedly?

Yes, but this generally only happens in abnormal exit situations. Normal exit by calling close can ensure that the current checkpoint is submitted.

2. If there are three pieces of data with checkpoints 1~3, and 2 is not acked for some reason, but 1 and 3 have been acked, what will the checkpoint be updated to?

The checkpoint will be updated to 1. Since 1 has been acked, 2 is now at the front of the queue and won't be popped, so the checkpoint will remain stuck at 1.

3. If a piece of data has been read through read, but has never been acked, will Consumer read this data again?

No, and the checkpoint will also remain stuck and not update, so users must ensure that every piece of data read is acked. If a piece of data exceeds a certain time (default is 60s) without being acked, continuing to call read will throw an exception.

Limits

  • Consumer is thread-safe. Only one Consumer object is needed for each topic in a process.

  • The number of Consumers should generally not exceed the number of shards. If there are more Consumers than shards, some Consumers will be idle because they cannot be assigned shards, but they will start running normally when other Consumers exit and shards become available.

  • When consuming a specified list of shards, different Consumers with the same subscription ID cannot consume the same shard.

Parameter description

Parameter name

Type

Required

Default value

Description

maxAsyncThreadNum

16

Thread pool size for reading data

userAgent

String

No

dcl-xxx

maxRetryCount

int

No

1

Maximum retries

maxRetryIntervalMs

int

No

1000

Retry interval for retriable errors, excluding throttling errors

maxRetryIntervalMsForLimit

int

No

100

Retry interval after read throttling

ProducerInterceptor

Object

No

-

Interceptor for additional processing when reading data, such as filtering sensitive information

HttpConfig

Object

No

-

HTTP-related default configurations are numerous, it's recommended to check the code directly

balanceRead

bool

No

false

true means sending read requests sequentially to shards consumed by the current consumer; false means selecting the shard with the oldest checkpoint among shards consumed by the current consumer to send read requests, mainly to prevent large checkpoint gaps in data skew scenarios

autoCommit

bool

No

true

Whether to automatically ack data: true means data is automatically acked after being read; false means after data is read, you need to manually call RecordEntry.getKey().ack(), otherwise the checkpoint won't advance

sessionTimeoutMs

long

No

60000

Maximum consumer session time. The consumer needs to continuously send heartbeats to the server to remain active. If no heartbeat is sent for longer than this time, it will be considered as exiting the consumer group by the server, and its shards will be assigned to other consumers

heartbeatRetryCount

int

No

1

Number of retries when the consumer fails to send heartbeats to remain active.

fetchNumber

int

No

500

Maximum number of records to read in a single request

maxBufferRecords

int

No

500

Number of records cached locally. If insufficient, requests will be sent to the server. Setting this too high may cause out-of-memory.

Consumer examples

Collaborative consumption (recommended)

Collaborative consumption is also known as Consumer Group. The server will dynamically allocate shards to each node for consumption. You only need to focus on data processing without worrying about checkpoint maintenance and shard allocation.

Auto-ack consumption

Each record is automatically acked after being read, indicating that the checkpoint can be updated. This may cause data loss in some cases.

public static void main(String[] args) throws InterruptedException {
    // Get AccessKey information through environment variables
    EnvironmentVariableCredentialProvider provider = EnvironmentVariableCredentialProvider.create();

    String endpoint = "https://dh-cn-hangzhou.aliyuncs.com";
    String projectName = "test_project";
    String topicName = "test_topic";
    String subId = "1747966903774M787N";

    ConsumerConfig config = new ConsumerConfig(endpoint, provider);
    DatahubConsumer consumer = new DatahubConsumer(projectName, topicName, subId, config);

    while (true) {
        RecordEntry recordEntry = null;
        try {
            recordEntry = consumer.read(5000);
            if (recordEntry != null) {
                TupleRecordData data = (TupleRecordData) recordEntry.getRecordData();
                // handle data
                System.out.println("read record: " + data.getField("field1") + ", " + data.getField("field2"));
            }
        } catch (DatahubClientException e) {
            // TODO Handle exceptions, usually non-retriable errors or exceeded retry count;
        }
    }
}

Manual ack consumption

If each record must be completely consumed before submitting the checkpoint, we recommend disabling autoCommit and manually acking each record.

public static void main(String[] args) throws InterruptedException {
  // Get AccessKey information through environment variables
  EnvironmentVariableCredentialProvider provider = EnvironmentVariableCredentialProvider.create();

  String endpoint = "https://dh-cn-hangzhou.aliyuncs.com";
  String projectName = "test_project";
  String topicName = "test_topic";
  String subId = "1747966903774M787N";

  ConsumerConfig config = new ConsumerConfig(endpoint, provider);
  // Set manual ack after data consumption succeeds
  config.setAutoCommit(false);
  DatahubConsumer consumer = new DatahubConsumer(projectName, topicName, subId, config);

  while (true) {
    RecordEntry recordEntry = null;
    try {
      recordEntry = consumer.read(5000);
      if (recordEntry != null) {
        TupleRecordData data = (TupleRecordData) recordEntry.getRecordData();
        // handle data
        System.out.println("read record: " + data.getField("field1") + ", " + data.getField("field2"));
      }
    } catch (DatahubClientException e) {
      // TODO Handle exceptions, usually non-retriable errors or exceeded retry count;
    } finally {
      if (recordEntry != null) {
        // Each record must be acked after processing, otherwise the checkpoint cannot advance
        recordEntry.getKey().ack();
      }
    }
  }
}

Specified shard consumption

Specified shard consumption requires you to maintain shard allocation yourself. Different Consumers with the same subscription ID cannot consume the same shard, otherwise consumption will fail. Here is an example of auto-ack. For manual ack, refer to the example above.

public static void main(String[] args) throws InterruptedException {
    // Get AccessKey information through environment variables
    EnvironmentVariableCredentialProvider provider = EnvironmentVariableCredentialProvider.create();

    String endpoint = "https://dh-cn-hangzhou.aliyuncs.com";
    String projectName = "test_project";
    String topicName = "test_topic";
    String subId = "1747966903774M787N";
    List<String> shardIds = Arrays.asList("0", "1");

    ConsumerConfig config = new ConsumerConfig(endpoint, provider);
    // Client specifies the list of shards to consume
    DatahubConsumer consumer = new DatahubConsumer(projectName, topicName, subId, shardIds, config);

    while (true) {
        RecordEntry recordEntry = null;
        try {
            recordEntry = consumer.read(5000);
            if (recordEntry != null) {
                TupleRecordData data = (TupleRecordData) recordEntry.getRecordData();
                // handle data
                System.out.println("read record: " + data.getField("field1") + ", " + data.getField("field2"));
            }
        } catch (DatahubClientException e) {
            // TODO Handle exceptions, usually non-retriable errors or exceeded retry count;
        }
    }
}