Overview
The Java High-Level SDK, commonly referred to as client-library, is primarily divided into Producer and Consumer components. This document introduces the related parameters and common usage patterns for both components.
Authentication
An AccessKey pair is provided by Alibaba Cloud and used to complete identity authentication when you call API operations. Each AccessKey pair consists of an AccessKey ID and an AccessKey secret. You must keep the AccessKey pair confidential. If your AccessKey pair is leaked, the security of all resources in your account is threatened. When accessing Alibaba Cloud OpenAPI, hardcoding the AccessKey pair in your code may lead to AccessKey leakage due to improper repository permission management.
Alibaba Cloud Credentials is an identity credential management tool provided by Alibaba Cloud for developers. After configuring the Credentials default credential chain, you do not need to hardcode the AccessKey pair in your code when accessing Alibaba Cloud OpenAPI, which can effectively ensure the security of cloud resources in your account.
Prerequisites
You have obtained the AccessKey ID and AccessKey secret of a Resource Access Management (RAM) user. For more information, see View the AccessKey information of a RAM user.
You have added the Cloud SDK Credentials dependency. (We recommend using the latest version of Credentials):
Configuration options
This document uses environment variables to obtain AccessKey information. For more options, see Manage access credentials
When using the configuration file option, make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET do not exist in your system. Otherwise, the configuration file will not take effect.
Alibaba Cloud SDK supports creating default access credentials by defining the ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variables. When you call an API operation, the system reads the AccessKey pair from the default credential and uses the AccessKey pair to complete authentication.
Configuration methods
Configure the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET.
Linux and macOS configuration method
Execute the following commands:
export ALIBABA_CLOUD_ACCESS_KEY_ID=<access_key_id>
export ALIBABA_CLOUD_ACCESS_KEY_SECRET=<access_key_secret>Replace <access_key_id> with your prepared AccessKey ID and <access_key_secret> with your AccessKey Secret.
Windows configuration method
Create an environment variable file and add the environment variables
ALIBABA_CLOUD_ACCESS_KEY_IDandALIBABA_CLOUD_ACCESS_KEY_SECRET, then write your prepared AccessKey ID and AccessKey Secret.Restart Windows.
Code example
EnvironmentVariableCredentialProvider provider = EnvironmentVariableCredentialProvider.create();Producer overview
Limits
Producer is thread-safe. Theoretically, only one Producer is needed for a topic within the same process.
Parameter description
Producer parameters are set through ProducerConfig. For example, to set the maxAsyncThreadNum parameter, you need to call the setMaxAsyncThreadNum method of ProducerConfig.
Parameter name | Type | Required | Default value | Description |
maxAsyncThreadNum | 16 | Thread pool size for sending data | ||
userAgent | String | No | dcl-xxx | |
maxRetryCount | int | No | 1 | Maximum retries |
maxRetryIntervalMs | int | No | 1000 | Retry interval for retriable errors, excluding throttling errors |
maxRetryIntervalMsForLimit | int | No | 100 | Retry interval after write throttling |
ProducerInterceptor | Object | No | - | Interceptor for additional processing when writing data, such as adding extra attribute information |
HttpConfig | Object | No | - | HTTP-related default configurations are numerous, it's recommended to check the code directly |
maxAsyncBufferRecords | int | No | INT_MAX | Maximum number of records for batch sending in async mode, usually controlled by size, so the default value is INT_MAX |
maxAsyncBufferTimeMs | long | No | 10000 | Maximum cache time for async sending |
maxAsyncBufferSize | long | No | 4 * 1024 * 1024 | Maximum batch size for async sending |
maxAsyncQueueSize | long | No | 16 | Number of requests being sent after batching in async mode, exceeding this will block the send interface, mainly to prevent out-of-memory |
useTTFormat | | | | |
enableHeartbeat | bool | No | false | Whether to send heartbeat packets, generally not needed |
heartbeatGenerator | Object | No | DefaultBlobHeartbeatGenerator | If heartbeat sending is enabled, the user-set heartbeatGenerator will be used first if configured, otherwise DefaultBlobHeartbeatGenerator will be used by default |
Producer examples
Dependencies
<!-- Zero trust credential related -->
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>credentials-java</artifactId>
<version>1.0.2</version>
</dependency>
<dependency>
<groupId>com.aliyun.datahub</groupId>
<artifactId>datahub-client-library</artifactId>
<version>1.4.11</version>
</dependency>Asynchronous writing (recommended)
The advantage of asynchronous writing is that you don't need to batch data yourself, and the batching method can be configured through parameters. You can refer to the parameter description above for optimization.
public static void main(String[] args) throws InterruptedException {
// Get AccessKey information through environment variables
EnvironmentVariableCredentialProvider provider = EnvironmentVariableCredentialProvider.create();
String endpoint ="https://dh-cn-hangzhou.aliyuncs.com";
String projectName = "test_project";
String topicName = "test_topic";
// Initialize Producer with default configuration
ProducerConfig config = new ProducerConfig(endpoint, provider);
DatahubProducer producer = new DatahubProducer(projectName, topicName, config);
RecordSchema schema = producer.getTopicSchema();
// If multi-version schema is enabled, you can also get the schema of a specific version
// RecordSchema schema = producer.getTopicSchema(3);
// For asynchronous writing, you can register a callback function as needed
WriteCallback callback = new WriteCallback() {
@Override
public void onSuccess(String shardId, List<RecordEntry> records, long elapsedTimeMs, long sendTimeMs) {
System.out.println("write success");
}
@Override
public void onFailure(String shardId, List<RecordEntry> records, long elapsedTimeMs, DatahubClientException e) {
System.out.println("write failed");
}
};
for (int i = 0; i < 10000; ++i) {
try {
// generate data by schema
TupleRecordData data = new TupleRecordData(schema);
data.setField("field1", "hello");
data.setField("field2", 1234);
RecordEntry recordEntry = new RecordEntry();
recordEntry.setRecordData(data);
producer.sendAsync(recordEntry, callback);
// If you don't need to know whether the data is sent successfully, you don't need to register a callback
// producer.sendAsync(recordEntry, null);
} catch (DatahubClientException e) {
// TODO Handle exceptions, usually non-retriable errors or exceeded retry count;
Thread.sleep(1000);
}
}
// Ensure all data is sent before exiting
producer.flush(true);
producer.close();
}Hash writing
If data needs to be ordered, you need to hash based on certain information. Data with the same hash value will be written to the same shard. Data within a single shard can guarantee order. We recommend implementing hash writing asynchronously.
public static void main(String[] args) throws InterruptedException {
// Get AccessKey information through environment variables
EnvironmentVariableCredentialProvider provider = EnvironmentVariableCredentialProvider.create();
String endpoint = "https://dh-cn-hangzhou.aliyuncs.com";
String projectName = "test_project";
String topicName = "test_topic";
// Initialize Producer with default configuration
ProducerConfig config = new ProducerConfig(endpoint, provider);
DatahubProducer producer = new DatahubProducer(projectName, topicName, config);
RecordSchema schema = producer.getTopicSchema();
// If multi-version schema is enabled, you can also get the schema of a specific version
// RecordSchema schema = producer.getTopicSchema(3);
// For asynchronous writing, you can register a callback function
WriteCallback callback = new WriteCallback() {
@Override
public void onSuccess(String shardId, List<RecordEntry> records, long elapsedTimeMs, long sendTimeMs) {
System.out.println("write success");
}
@Override
public void onFailure(String shardId, List<RecordEntry> records, long elapsedTimeMs, DatahubClientException e) {
System.out.println("write failed");
}
};
for (int i = 0; i < 10000; ++i) {
try {
// generate data by schema
TupleRecordData data = new TupleRecordData(schema);
data.setField("field1", "hello");
data.setField("field2", 1234);
RecordEntry recordEntry = new RecordEntry();
recordEntry.setRecordData(data);
// Set hash content for each record
recordEntry.setHashKey("test" + i);
producer.sendAsync(recordEntry, callback, DefaultRecordPartitioner.INSTANCE);
// If you don't need to know whether the data is sent successfully, you don't need to register a callback
// producer.sendAsync(recordEntry, null, DefaultRecordPartitioner.INSTANCE);
} catch (DatahubClientException e) {
// TODO Handle exceptions, usually non-retriable errors or exceeded retry count;
Thread.sleep(1000);
}
}
// Ensure all data is sent before exiting
producer.flush(true);
producer.close();
}Synchronous writing
If you want to control the batching method yourself, you can use synchronous writing.
public static void main(String[] args) throws InterruptedException {
// Get AccessKey information through environment variables
EnvironmentVariableCredentialProvider provider = EnvironmentVariableCredentialProvider.create();
String endpoint = "https://dh-cn-hangzhou.aliyuncs.com";
String projectName = "test_project";
String topicName = "test_topic";
// Initialize Producer with default configuration
ProducerConfig config = new ProducerConfig(endpoint, provider);
DatahubProducer producer = new DatahubProducer(projectName, topicName, config);
RecordSchema schema = producer.getTopicSchema();
// If multi-version schema is enabled, you can also get the schema of a specific version
// RecordSchema schema = producer.getTopicSchema(3);
List<RecordEntry> recordEntryList = new ArrayList<>();
for (int i = 0; i < 1000; ++i) {
// generate data by schema
TupleRecordData data = new TupleRecordData(schema);
data.setField("field1", "hello");
data.setField("field2", 1234);
RecordEntry recordEntry = new RecordEntry();
recordEntry.setRecordData(data);
recordEntryList.add(recordEntry);
}
// Write failure will throw an exception, usually non-retriable errors or retriable errors that exceed retry count
try {
String shardId = producer.send(recordEntryList);
System.out.println("write success, shardId: " + shardId);
} catch (DatahubClientException e) {
// TODO Handle exceptions, usually non-retriable errors or exceeded retry count;
}
producer.close();
}Consumer overview
Consumer is used for data consumption and can automatically allocate shards, generally called collaborative consumption. For more information, see Collaborative consumption.
Consumer actually reads data in batches, caches it locally, and then returns data one by one at the interface level.
Checkpoint maintenance
Consumer can automatically maintain checkpoint information. When starting, it will automatically retrieve the checkpoint saved on the server and continue consuming from the last saved checkpoint. During consumption, it will periodically (default is 10 seconds) submit the client's data checkpoint to the server. The specific implementation logic is as follows:
Each data point corresponds to a RecordKey object. After consuming a piece of data, you can perform an ack operation on the RecordKey. After ack, it indicates that this data has been consumed and the checkpoint can be updated. You can also choose automatic ack. After the client reads the data, it will maintain each data's corresponding RecordKey in a queue in order. There is a periodic task in the background that submits checkpoints to the server. Each time it checks the queue, if the RecordKey at the front of the queue has been acked, it will be popped from the queue, until the RecordKey at the front of the queue has not been acked. Then the checkpoint of the previous position of the current front RecordKey is the checkpoint that needs to be submitted to the server this time.
FAQ
1. If the client consumes a piece of data, but the checkpoint is not submitted to the server in time, will this data be consumed repeatedly?
Yes, but this generally only happens in abnormal exit situations. Normal exit by calling close can ensure that the current checkpoint is submitted.
2. If there are three pieces of data with checkpoints 1~3, and 2 is not acked for some reason, but 1 and 3 have been acked, what will the checkpoint be updated to?
The checkpoint will be updated to 1. Since 1 has been acked, 2 is now at the front of the queue and won't be popped, so the checkpoint will remain stuck at 1.
3. If a piece of data has been read through read, but has never been acked, will Consumer read this data again?
No, and the checkpoint will also remain stuck and not update, so users must ensure that every piece of data read is acked. If a piece of data exceeds a certain time (default is 60s) without being acked, continuing to call read will throw an exception.
Limits
Consumer is thread-safe. Only one Consumer object is needed for each topic in a process.
The number of Consumers should generally not exceed the number of shards. If there are more Consumers than shards, some Consumers will be idle because they cannot be assigned shards, but they will start running normally when other Consumers exit and shards become available.
When consuming a specified list of shards, different Consumers with the same subscription ID cannot consume the same shard.
Parameter description
Parameter name | Type | Required | Default value | Description |
maxAsyncThreadNum | 16 | Thread pool size for reading data | ||
userAgent | String | No | dcl-xxx | |
maxRetryCount | int | No | 1 | Maximum retries |
maxRetryIntervalMs | int | No | 1000 | Retry interval for retriable errors, excluding throttling errors |
maxRetryIntervalMsForLimit | int | No | 100 | Retry interval after read throttling |
ProducerInterceptor | Object | No | - | Interceptor for additional processing when reading data, such as filtering sensitive information |
HttpConfig | Object | No | - | HTTP-related default configurations are numerous, it's recommended to check the code directly |
balanceRead | bool | No | false | true means sending read requests sequentially to shards consumed by the current consumer; false means selecting the shard with the oldest checkpoint among shards consumed by the current consumer to send read requests, mainly to prevent large checkpoint gaps in data skew scenarios |
autoCommit | bool | No | true | Whether to automatically ack data: true means data is automatically acked after being read; false means after data is read, you need to manually call RecordEntry.getKey().ack(), otherwise the checkpoint won't advance |
sessionTimeoutMs | long | No | 60000 | Maximum consumer session time. The consumer needs to continuously send heartbeats to the server to remain active. If no heartbeat is sent for longer than this time, it will be considered as exiting the consumer group by the server, and its shards will be assigned to other consumers |
heartbeatRetryCount | int | No | 1 | Number of retries when the consumer fails to send heartbeats to remain active. |
fetchNumber | int | No | 500 | Maximum number of records to read in a single request |
maxBufferRecords | int | No | 500 | Number of records cached locally. If insufficient, requests will be sent to the server. Setting this too high may cause out-of-memory. |
Consumer examples
Collaborative consumption (recommended)
Collaborative consumption is also known as Consumer Group. The server will dynamically allocate shards to each node for consumption. You only need to focus on data processing without worrying about checkpoint maintenance and shard allocation.
Auto-ack consumption
Each record is automatically acked after being read, indicating that the checkpoint can be updated. This may cause data loss in some cases.
public static void main(String[] args) throws InterruptedException {
// Get AccessKey information through environment variables
EnvironmentVariableCredentialProvider provider = EnvironmentVariableCredentialProvider.create();
String endpoint = "https://dh-cn-hangzhou.aliyuncs.com";
String projectName = "test_project";
String topicName = "test_topic";
String subId = "1747966903774M787N";
ConsumerConfig config = new ConsumerConfig(endpoint, provider);
DatahubConsumer consumer = new DatahubConsumer(projectName, topicName, subId, config);
while (true) {
RecordEntry recordEntry = null;
try {
recordEntry = consumer.read(5000);
if (recordEntry != null) {
TupleRecordData data = (TupleRecordData) recordEntry.getRecordData();
// handle data
System.out.println("read record: " + data.getField("field1") + ", " + data.getField("field2"));
}
} catch (DatahubClientException e) {
// TODO Handle exceptions, usually non-retriable errors or exceeded retry count;
}
}
}Manual ack consumption
If each record must be completely consumed before submitting the checkpoint, we recommend disabling autoCommit and manually acking each record.
public static void main(String[] args) throws InterruptedException {
// Get AccessKey information through environment variables
EnvironmentVariableCredentialProvider provider = EnvironmentVariableCredentialProvider.create();
String endpoint = "https://dh-cn-hangzhou.aliyuncs.com";
String projectName = "test_project";
String topicName = "test_topic";
String subId = "1747966903774M787N";
ConsumerConfig config = new ConsumerConfig(endpoint, provider);
// Set manual ack after data consumption succeeds
config.setAutoCommit(false);
DatahubConsumer consumer = new DatahubConsumer(projectName, topicName, subId, config);
while (true) {
RecordEntry recordEntry = null;
try {
recordEntry = consumer.read(5000);
if (recordEntry != null) {
TupleRecordData data = (TupleRecordData) recordEntry.getRecordData();
// handle data
System.out.println("read record: " + data.getField("field1") + ", " + data.getField("field2"));
}
} catch (DatahubClientException e) {
// TODO Handle exceptions, usually non-retriable errors or exceeded retry count;
} finally {
if (recordEntry != null) {
// Each record must be acked after processing, otherwise the checkpoint cannot advance
recordEntry.getKey().ack();
}
}
}
}Specified shard consumption
Specified shard consumption requires you to maintain shard allocation yourself. Different Consumers with the same subscription ID cannot consume the same shard, otherwise consumption will fail. Here is an example of auto-ack. For manual ack, refer to the example above.
public static void main(String[] args) throws InterruptedException {
// Get AccessKey information through environment variables
EnvironmentVariableCredentialProvider provider = EnvironmentVariableCredentialProvider.create();
String endpoint = "https://dh-cn-hangzhou.aliyuncs.com";
String projectName = "test_project";
String topicName = "test_topic";
String subId = "1747966903774M787N";
List<String> shardIds = Arrays.asList("0", "1");
ConsumerConfig config = new ConsumerConfig(endpoint, provider);
// Client specifies the list of shards to consume
DatahubConsumer consumer = new DatahubConsumer(projectName, topicName, subId, shardIds, config);
while (true) {
RecordEntry recordEntry = null;
try {
recordEntry = consumer.read(5000);
if (recordEntry != null) {
TupleRecordData data = (TupleRecordData) recordEntry.getRecordData();
// handle data
System.out.println("read record: " + data.getField("field1") + ", " + data.getField("field2"));
}
} catch (DatahubClientException e) {
// TODO Handle exceptions, usually non-retriable errors or exceeded retry count;
}
}
}