All Products
Search
Document Center

DataHub:SDK practice guide

Last Updated:Oct 27, 2025

This topic describes considerations and exception references when using the Java SDK.

Initialization

You can access DataHub using an Alibaba Cloud account. You must provide the AccessKey ID and AccessKey of your Alibaba Cloud account and the endpoint of DataHub. The following code shows how to create a DataHubClient using the DataHub endpoint list:

SDK 2.25.1 and later (recommended)

//Create a DataHubClient instance using the new Batch transmission protocol
DatahubConfig.Protocol protocol = DatahubConfig.Protocol.BATCH;
DatahubClient datahubClient = DatahubClientBuilder.newBuilder().setDatahubConfig(
  		         //Protocol is optional. If you do not set this parameter, the PROTOBUF transmission protocol is used by default.
                new DatahubConfig(endpoint, new AliyunAccount(accessId, accessKey), protocol)
        ).setHttpConfig(new HttpConfig().setCompressType(CompressType.ZSTD)).build();
  • Configuration description

    • DatahubConfig

      Name

      Description

      endpoint

      The endpoint of DataHub.

      account

      The information about your Alibaba Cloud account.

      protocol

      The transmission protocol. Valid values: PROTOBUF and BATCH.

      Note

      To use the BATCH transmission protocol, you must enable multiple versions. You can enable multiple versions in the console or using the SDK. For more information, see DataHub cost-saving strategies.

    • HttpConfig

      Name

      Description

      readTimeout

      The socket read/write timeout period. Default value: 10s.

      connTimeout

      The TCP connection timeout period. Default value: 10s.

      maxRetryCount

      The number of retries after a request fails. Default value: 1. We recommend that you do not modify this parameter. Retries are handled by the upper business layer.

      debugRequest

      Specifies whether to print request logs. Default value: false.

      compressType

      The data transmission compression method. Default value: lz4. Valid values: lz4, deflate, and ztsd.

      proxyUri

      The IP address that is used to access the proxy server.

      proxyUsername

      The username that is used to log on to the proxy server.

      proxyPassword

      The password that is used to log on to the proxy server.

  • SDK statistics information

    The SDK supports statistics such as queries per second (QPS) for requests such as put and get. You can enable this feature using the following code:

    ClientMetrics.startMetrics();

    By default, metric statistics information is printed to log files. You need to configure slf4j. The metric package is: com.aliyun.datahub.client.metrics.

Write data to DataHub

The following example shows how to write data to a tuple topic:

  public void writeTupleTopic(int maxRetry) {
    String shardId = "9";
    // Generate 10 records
    List<RecordEntry> recordEntries = new ArrayList<>();
    for (int i = 0; i < 10; ++i) {
      RecordEntry recordEntry = new RecordEntry();
      // Set additional attributes for each record
      recordEntry.addAttribute("key1", "value11");
      TupleRecordData data = new TupleRecordData(this.recordSchema);
      data.setField("field1", "Hello World");
      data.setField("field2", 1234567);
      recordEntry.setRecordData(data);
      recordEntry.setShardId(shardId);
      recordEntries.add(recordEntry);
    }

    int retryNum = 0;
    while (retryNum < maxRetry) {
      try {
        // Supported by the server from version 2.12. For earlier versions, use the putRecords interface
        //datahubClient.putRecordsByShard(Constant.projectName, Constant.topicName, shardId, recordEntries);
        PutRecordsResult putRecordsResult = this.datahubClient.putRecords(Constant.projectName,
            Constant.topicName, recordEntries);
        System.out.println("write tuple data successful");
        System.out.println(putRecordsResult.getPutErrorEntries());
        break;
      } catch (InvalidParameterException e) {
        // invalid parameter
        e.printStackTrace();
        throw e;
      } catch (AuthorizationFailureException e) {
        // AK error
        e.printStackTrace();
        throw e;
      } catch (ResourceNotFoundException e) {
        // project or topic not found
        e.printStackTrace();
        throw e;
      } catch (ShardSealedException e) {
        // shard status is CLOSED, read only
        e.printStackTrace();
        throw e;
      } catch (LimitExceededException e) {
        // limit exceed, retry
        e.printStackTrace();
        retryNum++;
      } catch (DatahubClientException e) {
        // other error
        e.printStackTrace();
        retryNum++;
      }
    }
  }

Create a subscription to consume DataHub data

//Example of offset consumption and offset commit during consumption
public static void example() {
  String shardId = "0";
  List<String> shardIds = Arrays.asList("0", "1");
  OpenSubscriptionSessionResult openSubscriptionSessionResult = datahubClient.openSubscriptionSession(Constant.projectName, Constant.topicName, subId, shardIds);
  SubscriptionOffset subscriptionOffset = openSubscriptionSessionResult.getOffsets().get(shardId);
  // 1. Obtain the cursor of the current offset. If the current offset has expired, obtain the cursor of the first record within the lifecycle. If the data is not consumed, also obtain the cursor of the first record within the lifecycle.
  String cursor = null;
  //sequence < 0 indicates that the data is not consumed
  if (subscriptionOffset.getSequence() < 0) {
      // Obtain the cursor of the first record within the lifecycle
      cursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.OLDEST).getCursor();
  } else {
      // Obtain the cursor of the next record
      long nextSequence = subscriptionOffset.getSequence() + 1;
      try {
          //When you obtain a cursor by SEQUENCE, a SeekOutOfRange error may be returned, indicating that the data of the current cursor has expired
          cursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.SEQUENCE, nextSequence).getCursor();
      } catch (SeekOutOfRangeException e) {
          // Obtain the cursor of the first record within the lifecycle
          cursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.OLDEST).getCursor();
      }
  }
  // 2. Read and save the offset. The following example shows how to read tuple data and save the offset every 1,000 records
  long recordCount = 0L;
  // Read 10 records each time
  int fetchNum = 10;
  while (true) {
      try {
          GetRecordsResult getRecordsResult = datahubClient.getRecords(Constant.projectName, Constant.topicName, shardId, schema, cursor, fetchNum);
          if (getRecordsResult.getRecordCount() <= 0) {
              // No data. Sleep and then read
              Thread.sleep(1000);
              continue;
          }
          for (RecordEntry recordEntry : getRecordsResult.getRecords()) {
              //Consume data
              TupleRecordData data = (TupleRecordData) recordEntry.getRecordData();
              System.out.println("field1:" + data.getField("field1") + "\t"
                      + "field2:" + data.getField("field2"));
              // After the data is processed, set the offset
              ++recordCount;
              subscriptionOffset.setSequence(recordEntry.getSequence());
              subscriptionOffset.setTimestamp(recordEntry.getSystemTime());
              if (recordCount % 1000 == 0) {
        //Commit the offset
                  Map<String, SubscriptionOffset> offsetMap = new HashMap<>();
                  offsetMap.put(shardId, subscriptionOffset);
                  datahubClient.commitSubscriptionOffset(Constant.projectName, Constant.topicName, subId, offsetMap);
                  System.out.println("commit offset successful");
              }
          }
          cursor = getRecordsResult.getNextCursor();
      } catch (SubscriptionOfflineException | SubscriptionSessionInvalidException e) {
          // Exit. Offline: The subscription is offline. SubscriptionSessionInvalid: The subscription is being consumed by another client at the same time
          break;
      } catch (SubscriptionOffsetResetException e) {
          // The offset is reset. Obtain the SubscriptionOffset information again. The following example shows how to reset by sequence
          // If the offset is reset by timestamp, you need to obtain the cursor using CursorType.SYSTEM_TIME
          subscriptionOffset = datahubClient.getSubscriptionOffset(Constant.projectName, Constant.topicName, subId, shardIds).getOffsets().get(shardId);
          long nextSequence = subscriptionOffset.getSequence() + 1;
          cursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.SEQUENCE, nextSequence).getCursor();
      } catch (DatahubClientException e) {
          // TODO: Determine whether to exit based on different exceptions
      } catch (Exception e) {
          break;
      }
  }
}

Exception types

Java SDK (version 2.12 and later) has organized the exception types of DataHub. You can use the try-catch mechanism to capture exception types and handle them accordingly.

Among the exception types, only DatahubClientException and LimitExceededException are retryable errors. DatahubClientException includes some retryable errors, such as server busy and server unavailable. Therefore, we recommend that you add retry logic in your code when you encounter DatahubClientException or LimitExceededException, but strictly limit the number of retries.

The following table lists the exceptions in version 2.12 and later. The package path is com.aliyun.datahub.client.exception.

Exception class name

Error code

Description

InvalidParameterException

InvalidParameter

InvalidCursor

The error message returned because a parameter is set to an invalid value.

ResourceNotFoundException

ResourceNotFound

NoSuchProject

NoSuchTopic

NoSuchShard

NoSuchSubscription

NoSuchConnector

NoSuchMeteringInfo

The accessed resource does not exist. (Note: If you send other requests immediately after a Split/Merge operation, this exception may be thrown.)

ResourceAlreadyExistException

ResourceAlreadyExist

ProjectAlreadyExist

TopicAlreadyExist

ConnectorAlreadyExist

The resource already exists. This exception is thrown when you try to create a resource that already exists.

SeekOutOfRangeException

SeekOutOfRange

When you call getCursor, the specified sequence is not within the valid range (usually because the data has expired), or the specified timestamp is later than the current time.

AuthorizationFailureException

Unauthorized

An error occurred while parsing the Authorization signature. Check whether the AccessKey is correct.

NoPermissionException

NoPermission

OperationDenied

You do not have the permission. This is usually because the RAM configuration is incorrect or the RAM user is not properly authorized.

ShardSealedException

InvalidShardOperation

The shard is in the CLOSED state and is read-only. This exception is thrown when you continue to write data to a CLOSED shard or continue to read data after reading the last record.

LimitExceededException

LimitExceeded

The interface usage exceeds the limit. For more information, see Limits.

SubscriptionOfflineException

SubscriptionOffline

The subscription is offline and unavailable.

SubscriptionSessionInvalidException

OffsetSessionChanged

OffsetSessionClosed

A subscription session exception occurred. When you use a subscription, a session is established to commit offsets. If another client uses this subscription, this exception is thrown.

SubscriptionOffsetResetException

OffsetReseted

The subscription offset is reset.

MalformedRecordException

MalformedRecord

ShardNotReady

The record format is invalid. Possible causes: The schema is incorrect, the record contains non-UTF-8 characters, the client uses Protocol Buffers but the server does not support it, and so on.

DatahubClientException

All other error codes. This is the base class of all exceptions.

If the preceding exceptions are excluded, you can usually retry the operation. However, you must limit the number of retries.