This topic describes considerations and exception references when using the Java SDK.
Initialization
You can access DataHub using an Alibaba Cloud account. You must provide the AccessKey ID and AccessKey of your Alibaba Cloud account and the endpoint of DataHub. The following code shows how to create a DataHubClient using the DataHub endpoint list:
SDK 2.25.1 and later (recommended)
//Create a DataHubClient instance using the new Batch transmission protocol
DatahubConfig.Protocol protocol = DatahubConfig.Protocol.BATCH;
DatahubClient datahubClient = DatahubClientBuilder.newBuilder().setDatahubConfig(
//Protocol is optional. If you do not set this parameter, the PROTOBUF transmission protocol is used by default.
new DatahubConfig(endpoint, new AliyunAccount(accessId, accessKey), protocol)
).setHttpConfig(new HttpConfig().setCompressType(CompressType.ZSTD)).build();
Configuration description
DatahubConfig
Name
Description
endpoint
The endpoint of DataHub.
account
The information about your Alibaba Cloud account.
protocol
The transmission protocol. Valid values: PROTOBUF and BATCH.
NoteTo use the BATCH transmission protocol, you must enable multiple versions. You can enable multiple versions in the console or using the SDK. For more information, see DataHub cost-saving strategies.
HttpConfig
Name
Description
readTimeout
The socket read/write timeout period. Default value: 10s.
connTimeout
The TCP connection timeout period. Default value: 10s.
maxRetryCount
The number of retries after a request fails. Default value: 1. We recommend that you do not modify this parameter. Retries are handled by the upper business layer.
debugRequest
Specifies whether to print request logs. Default value: false.
compressType
The data transmission compression method. Default value: lz4. Valid values: lz4, deflate, and ztsd.
proxyUri
The IP address that is used to access the proxy server.
proxyUsername
The username that is used to log on to the proxy server.
proxyPassword
The password that is used to log on to the proxy server.
SDK statistics information
The SDK supports statistics such as queries per second (QPS) for requests such as put and get. You can enable this feature using the following code:
ClientMetrics.startMetrics();By default, metric statistics information is printed to log files. You need to configure slf4j. The metric package is:
com.aliyun.datahub.client.metrics.
Write data to DataHub
The following example shows how to write data to a tuple topic:
public void writeTupleTopic(int maxRetry) {
String shardId = "9";
// Generate 10 records
List<RecordEntry> recordEntries = new ArrayList<>();
for (int i = 0; i < 10; ++i) {
RecordEntry recordEntry = new RecordEntry();
// Set additional attributes for each record
recordEntry.addAttribute("key1", "value11");
TupleRecordData data = new TupleRecordData(this.recordSchema);
data.setField("field1", "Hello World");
data.setField("field2", 1234567);
recordEntry.setRecordData(data);
recordEntry.setShardId(shardId);
recordEntries.add(recordEntry);
}
int retryNum = 0;
while (retryNum < maxRetry) {
try {
// Supported by the server from version 2.12. For earlier versions, use the putRecords interface
//datahubClient.putRecordsByShard(Constant.projectName, Constant.topicName, shardId, recordEntries);
PutRecordsResult putRecordsResult = this.datahubClient.putRecords(Constant.projectName,
Constant.topicName, recordEntries);
System.out.println("write tuple data successful");
System.out.println(putRecordsResult.getPutErrorEntries());
break;
} catch (InvalidParameterException e) {
// invalid parameter
e.printStackTrace();
throw e;
} catch (AuthorizationFailureException e) {
// AK error
e.printStackTrace();
throw e;
} catch (ResourceNotFoundException e) {
// project or topic not found
e.printStackTrace();
throw e;
} catch (ShardSealedException e) {
// shard status is CLOSED, read only
e.printStackTrace();
throw e;
} catch (LimitExceededException e) {
// limit exceed, retry
e.printStackTrace();
retryNum++;
} catch (DatahubClientException e) {
// other error
e.printStackTrace();
retryNum++;
}
}
}
Create a subscription to consume DataHub data
//Example of offset consumption and offset commit during consumption
public static void example() {
String shardId = "0";
List<String> shardIds = Arrays.asList("0", "1");
OpenSubscriptionSessionResult openSubscriptionSessionResult = datahubClient.openSubscriptionSession(Constant.projectName, Constant.topicName, subId, shardIds);
SubscriptionOffset subscriptionOffset = openSubscriptionSessionResult.getOffsets().get(shardId);
// 1. Obtain the cursor of the current offset. If the current offset has expired, obtain the cursor of the first record within the lifecycle. If the data is not consumed, also obtain the cursor of the first record within the lifecycle.
String cursor = null;
//sequence < 0 indicates that the data is not consumed
if (subscriptionOffset.getSequence() < 0) {
// Obtain the cursor of the first record within the lifecycle
cursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.OLDEST).getCursor();
} else {
// Obtain the cursor of the next record
long nextSequence = subscriptionOffset.getSequence() + 1;
try {
//When you obtain a cursor by SEQUENCE, a SeekOutOfRange error may be returned, indicating that the data of the current cursor has expired
cursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.SEQUENCE, nextSequence).getCursor();
} catch (SeekOutOfRangeException e) {
// Obtain the cursor of the first record within the lifecycle
cursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.OLDEST).getCursor();
}
}
// 2. Read and save the offset. The following example shows how to read tuple data and save the offset every 1,000 records
long recordCount = 0L;
// Read 10 records each time
int fetchNum = 10;
while (true) {
try {
GetRecordsResult getRecordsResult = datahubClient.getRecords(Constant.projectName, Constant.topicName, shardId, schema, cursor, fetchNum);
if (getRecordsResult.getRecordCount() <= 0) {
// No data. Sleep and then read
Thread.sleep(1000);
continue;
}
for (RecordEntry recordEntry : getRecordsResult.getRecords()) {
//Consume data
TupleRecordData data = (TupleRecordData) recordEntry.getRecordData();
System.out.println("field1:" + data.getField("field1") + "\t"
+ "field2:" + data.getField("field2"));
// After the data is processed, set the offset
++recordCount;
subscriptionOffset.setSequence(recordEntry.getSequence());
subscriptionOffset.setTimestamp(recordEntry.getSystemTime());
if (recordCount % 1000 == 0) {
//Commit the offset
Map<String, SubscriptionOffset> offsetMap = new HashMap<>();
offsetMap.put(shardId, subscriptionOffset);
datahubClient.commitSubscriptionOffset(Constant.projectName, Constant.topicName, subId, offsetMap);
System.out.println("commit offset successful");
}
}
cursor = getRecordsResult.getNextCursor();
} catch (SubscriptionOfflineException | SubscriptionSessionInvalidException e) {
// Exit. Offline: The subscription is offline. SubscriptionSessionInvalid: The subscription is being consumed by another client at the same time
break;
} catch (SubscriptionOffsetResetException e) {
// The offset is reset. Obtain the SubscriptionOffset information again. The following example shows how to reset by sequence
// If the offset is reset by timestamp, you need to obtain the cursor using CursorType.SYSTEM_TIME
subscriptionOffset = datahubClient.getSubscriptionOffset(Constant.projectName, Constant.topicName, subId, shardIds).getOffsets().get(shardId);
long nextSequence = subscriptionOffset.getSequence() + 1;
cursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.SEQUENCE, nextSequence).getCursor();
} catch (DatahubClientException e) {
// TODO: Determine whether to exit based on different exceptions
} catch (Exception e) {
break;
}
}
}Exception types
Java SDK (version 2.12 and later) has organized the exception types of DataHub. You can use the try-catch mechanism to capture exception types and handle them accordingly.
Among the exception types, only DatahubClientException and LimitExceededException are retryable errors. DatahubClientException includes some retryable errors, such as server busy and server unavailable. Therefore, we recommend that you add retry logic in your code when you encounter DatahubClientException or LimitExceededException, but strictly limit the number of retries.
The following table lists the exceptions in version 2.12 and later. The package path is com.aliyun.datahub.client.exception.
Exception class name | Error code | Description |
InvalidParameterException |
| The error message returned because a parameter is set to an invalid value. |
ResourceNotFoundException |
| The accessed resource does not exist. (Note: If you send other requests immediately after a Split/Merge operation, this exception may be thrown.) |
ResourceAlreadyExistException |
| The resource already exists. This exception is thrown when you try to create a resource that already exists. |
SeekOutOfRangeException |
| When you call getCursor, the specified sequence is not within the valid range (usually because the data has expired), or the specified timestamp is later than the current time. |
AuthorizationFailureException |
| An error occurred while parsing the Authorization signature. Check whether the AccessKey is correct. |
NoPermissionException |
| You do not have the permission. This is usually because the RAM configuration is incorrect or the RAM user is not properly authorized. |
ShardSealedException |
| The shard is in the CLOSED state and is read-only. This exception is thrown when you continue to write data to a CLOSED shard or continue to read data after reading the last record. |
LimitExceededException |
| The interface usage exceeds the limit. For more information, see Limits. |
SubscriptionOfflineException |
| The subscription is offline and unavailable. |
SubscriptionSessionInvalidException |
| A subscription session exception occurred. When you use a subscription, a session is established to commit offsets. If another client uses this subscription, this exception is thrown. |
SubscriptionOffsetResetException |
| The subscription offset is reset. |
MalformedRecordException |
| The record format is invalid. Possible causes: The schema is incorrect, the record contains non-UTF-8 characters, the client uses Protocol Buffers but the server does not support it, and so on. |
DatahubClientException | All other error codes. This is the base class of all exceptions. | If the preceding exceptions are excluded, you can usually retry the operation. However, you must limit the number of retries. |