All Products
Search
Document Center

DataHub:DataHub SDK for Java

Last Updated:Jan 18, 2023

DataHub SDK for Java

1. Maven dependencies and JDK

Maven POM

<dependency>
  <groupId>com.aliyun.datahub</groupId>
  <artifactId>aliyun-sdk-datahub</artifactId>
  <version>2.19.0-public</version>
</dependency>

JDK version jdk: >= 1.8

2. Usage notes

  1. If your current SDK version is updated from V2.9, the setTimestampInms method is replaced by another method. Take note that the timestamp value in the new version is that in V2.9 multiplied by 1,000.

  2. In general, the putRecords or putRecordsByShard, and getRecords methods are the most frequently called to read and write data. Other methods, such as getTopic, getCursor, and listShard, are called only during initialization.

  3. You can initialize one or more DataHub clients in a project. Multiple DataHub clients can be concurrently used.

  4. Different packages may contain classes of the same names but in different directories. DataHub SDK for Java V2.12 uses the classes in the com.aliyun.datahub.client package, whereas the classes of the same names in other packages are provided for versions earlier than V2.12. Examples:

  5. The com.aliyun.datahub.client.model.RecordSchema package is used for DataHub SDK for Java V2.12.

  6. The com.aliyun.datahub.common.data.RecordSchema package contains the code of DataHub SDK for Java whose version is earlier than V2.12. If you update your SDK version to V2.12 or later but do not modify the code, the code in the package can still be used.

  7. If the "Parse body failed, Offset: 0" error occurs, you can set the enableBinary parameter to false.

3. Use DataHub SDK for Java

Initialization

You can use an Alibaba Cloud account to access DataHub. To access DataHub, you must provide your AccessKey ID and AccessKey secret, and the endpoint that is used to access DataHub. The following sample code provides an example on how to create a DataHub client by using a DataHub endpoint:

// In this example, an endpoint of the China (Hangzhou) region is used. You can also use an endpoint of another region as needed.
String endpoint = "http://dh-cn-hangzhou.aliyuncs.com";
String accessId = "<YourAccessKeyId>";
String accessKey = "<YourAccessKeySecret>";
// Create a DataHub client.
DatahubClient datahubClient = DatahubClientBuilder.newBuilder()
        .setDatahubConfig(
                new DatahubConfig(endpoint,
                        // Specify whether to enable binary data transmission. In DataHub SDK for Java V2.12 and later, the server supports binary data transmission.
                        new AliyunAccount(accessId, accessKey), true))
                        // If an error occurs in Apsara Stack DataHub, set this parameter to false.
        // The HttpConfig parameter is optional. If you do not set the HttpConfig parameter, the default value is used.
        .setHttpConfig(new HttpConfig()
                .setCompressType(HttpConfig.CompressType.LZ4) // When you read data from or write data to DataHub, we recommend that you use the LZ4 compression algorithm for data transmission.
                .setConnTimeout(10000))
        .build();

Configuration description: DatahubConfig

Parameter

Description

endpoint

The endpoint that is used to access DataHub.

account

The information about the Alibaba Cloud account.

enableBinary

Specifies whether to perform binary data transmission. In DataHub SDK for Java V2.12 and later, the server supports binary data transmission. If the SDK version is earlier than V2.12, set this parameter to false. If the "Parse body failed, Offset:0" error occurs in Apsara Stack DataHub, set this parameter to false.

HttpConfig

Parameter

Description

readTimeout

The timeout period of a socket read/write. Unit: seconds. Default value: 10.

connTimeout

The timeout period of a TCP connection. Unit: seconds. Default value: 10.

maxRetryCount

The maximum number of retries after a request failure. Default value: 1. We recommend that you do not change the value. The upper business layer performs retries.

debugRequest

Specifies whether to display the request logs. Default value: false.

compressType

The compression mode for data transmission. By default, no compression mode is used. LZ4 and deflate compression modes are supported.

proxyUri

The uniform resource identifier (URI) of the proxy host.

proxyUsername

The username that is verified by the proxy server.

proxyPassword

The password that is verified by the proxy server.

SDK statistics You can use DataHub SDK for Java to collect statistics on the data read/write requests, such as the queries that are initiated per second. You can call the following method to collect statistics:

ClientMetrics.startMetrics();

By default, the statistics on metrics are displayed in log files. In this case, you must configure Simple Logging Facade for Java (SLF4J). The following metric package is used: com.aliyun.datahub.client.metrics.

Write data to DataHub

In the following example, data is written to a tuple topic in DataHub.

// Write tuple records.
    public static void tupleExample(String project,String topic,int retryTimes) {
        // Obtain the schema.
        RecordSchema recordSchema = datahubClient.getTopic(project,topic ).getRecordSchema();
        // Generate 10 records.
        List<RecordEntry> recordEntries = new ArrayList<>();
        for (int i = 0; i < 10; ++i) {
            RecordEntry recordEntry = new RecordEntry();
            // You can specify additional attributes, such as the IP address and machine name of the server, for each record. If you do not specify additional attributes, data writing is not affected.
            recordEntry.addAttribute("key1", "value1");

            TupleRecordData data = new TupleRecordData(recordSchema);
            data.setField("field1", "HelloWorld");
            data.setField("field2", 1234567);
            recordEntry.setRecordData(data);
            recordEntries.add(recordEntry);
        }
        try {
            PutRecordsResult result = datahubClient.putRecords(project, topic, recordEntries);
            int i = result.getFailedRecordCount();
            if (i > 0) {
                retry(datahubClient, result.getFailedRecords(), retryTimes, project, topic);
            }
        }  catch (DatahubClientException e) {
            System.out.println("requestId:" + e.getRequestId() + "\tmessage:" + e.getErrorMessage());
        }
    }
    // The retry mechanism.
    public static void retry(DatahubClient client, List<RecordEntry> records, int retryTimes, String project, String topic) {
        boolean suc = false;
        while (retryTimes != 0) {
            retryTimes = retryTimes - 1;
            PutRecordsResult recordsResult = client.putRecords(project, topic, records);
            if (recordsResult.getFailedRecordCount() > 0) {
                retry(client,recordsResult.getFailedRecords(),retryTimes,project,topic);
            }
            suc = true;
            break;
        }
        if (!suc) {
            System.out.println("retryFailure");
        }
    }

Create a subscription to consume DataHub data

// The following sample code provides an example on how to consume data from a saved offset and submit offsets during consumption.
public static void example() {
  String shardId = "0";
  List<String> shardIds = Arrays.asList("0", "1");
  OpenSubscriptionSessionResult openSubscriptionSessionResult = datahubClient.openSubscriptionSession(Constant.projectName, Constant.topicName, subId, shardIds);
  SubscriptionOffset subscriptionOffset = openSubscriptionSessionResult.getOffsets().get(shardId);
  // 1. Obtain the cursor of the record at the current offset. If the record expires or is not consumed, obtain the cursor of the first record within the time to live (TTL) of the topic.
  String cursor = null;
  // If the sequence number is smaller than 0, the record is not consumed.
  if (subscriptionOffset.getSequence() < 0) {
      // Obtain the cursor of the first record within the TTL of the topic.
      cursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.OLDEST).getCursor();
  } else {
      // Obtain the cursor of the next record.
      long nextSequence = subscriptionOffset.getSequence() + 1;
      try {
          // If the SeekOutOfRange error is returned after you obtain the cursor based on the sequence number, the record of the current cursor expires.
          cursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.SEQUENCE, nextSequence).getCursor();
      } catch (SeekOutOfRangeException e) {
          // Obtain the cursor of the first record within the TTL of the topic.
          cursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.OLDEST).getCursor();
      }
  }
  // 2. Read records and save offsets. For example, read tuple records and save an offset each time 1,000 records are read.
  long recordCount = 0L;
  // Read 10 records each time.
  int fetchNum = 10;
  while (true) {
      try {
          GetRecordsResult getRecordsResult = datahubClient.getRecords(Constant.projectName, Constant.topicName, shardId, schema, cursor, fetchNum);
          if (getRecordsResult.getRecordCount() <= 0) {
              // If no record can be read, pause the thread for 1,000 ms and continue to read records.
              Thread.sleep(1000);
              continue;
          }
          for (RecordEntry recordEntry : getRecordsResult.getRecords()) {
              // Consume data.
              TupleRecordData data = (TupleRecordData) recordEntry.getRecordData();
              System.out.println("field1:" + data.getField("field1") + "\t"
                      + "field2:" + data.getField("field2"));
              // Save the offset after the data is consumed.
              ++recordCount;
              subscriptionOffset.setSequence(recordEntry.getSequence());
              subscriptionOffset.setTimestamp(recordEntry.getSystemTime());
              if (recordCount % 1000 == 0) {
                  // Submit the offset.
                  Map<String, SubscriptionOffset> offsetMap = new HashMap<>();
                  offsetMap.put(shardId, subscriptionOffset);
                  datahubClient.commitSubscriptionOffset(Constant.projectName, Constant.topicName, subId, offsetMap);
                  System.out.println("commit offset successful");
              }
          }
          cursor = getRecordsResult.getNextCursor();
      } catch (SubscriptionOfflineException | SubscriptionSessionInvalidException e) {
          // The subscription session is exited. Offline: The subscription is offline. SubscriptionSessionInvalid: The subscription is also used on other clients.
          break;
      } catch (SubscriptionOffsetResetException e) {
          // The offset is reset. You must obtain the offset information of the subscription again. In this example, the sequence number is rest.
          // If the timestamp is rest, you must use the CursorType.SYSTEM_TIME parameter to obtain the cursor.
          subscriptionOffset = datahubClient.getSubscriptionOffset(Constant.projectName, Constant.topicName, subId, shardIds).getOffsets().get(shardId);
          long nextSequence = subscriptionOffset.getSequence() + 1;
          cursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.SEQUENCE, nextSequence).getCursor();
      } catch (DatahubClientException e) {
          // TODO: Specify whether to exit when an error occurs.
      } catch (Exception e) {
          break;
      }
  }
}

4.Error types

This section describes the types of errors related to DataHub SDK for Java V2.12 and later. You can configure a try-catch mechanism to catch the error type and process errors. Only DatahubClientException and LimitExceededException errors can be resolved by retires. Some DatahubClientException errors, such as the errors that are caused because the server is busy or unavailable, can be resolved by retries. We recommend that you add retry logic to the code for DatahubClientException and LimitExceededException errors. However, the number of retries must be limited. The following table describes the types of errors related to DataHub SDK for Java V2.12 and later. The error files are stored in the following package: com.aliyun.datahub.client.exception.

Error type

Error message

Description

InvalidParameterException

InvalidParameter, InvalidCursor

The error message returned because a specified parameter is invalid.

ResourceNotFoundException

ResourceNotFound, NoSuchProject, NoSuchTopic, NoSuchShard, NoSuchSubscription, NoSuchConnector, NoSuchMeteringInfo

The error message returned because the resource to be accessed does not exist. If you immediately send another request after you split or merge shards, this error message is returned.

ResourceAlreadyExistException

ResourceAlreadyExist, ProjectAlreadyExist, TopicAlreadyExist, ConnectorAlreadyExist

The error message returned because the resource already exists. If the resource that you want to create already exists, this error message is returned.

SeekOutOfRangeException

SeekOutOfRange

The error message returned because the specified sequence number is invalid or the specified timestamp is later than the current timestamp when you obtain the cursor. The sequence number may become invalid because the record of the cursor expires.

AuthorizationFailureException

Unauthorized

The error message returned because an error occurs when the authorization signature is being parsed. Check whether the AccessKey pair is valid.

NoPermissionException

NoPermission, OperationDenied

The error message returned because you do not have permissions. Check whether the RAM configurations are valid or the RAM user is authorized.

ShardSealedException

InvalidShardOperation

The error message returned because the shard is closed and data cannot be read from or written to the shard. If you continue to write data to the shard or continue to read data after the last data record is read from the shard, this error message is returned.

LimitExceededException

LimitExceeded

The error message returned because the limits of DataHub SDK for Java have been exceeded. For more information, see Limits.

SubscriptionOfflineException

SubscriptionOffline

The error message returned because the subscription is offline and cannot be used.

SubscriptionSessionInvalidException

OffsetSessionChanged, OffsetSessionClosed

The error message returned because the subscription session is abnormal. When a subscription is used, a session is established to submit offsets. If the subscription is also used on another client, this error message is returned.

SubscriptionOffsetResetException

OffsetReseted

The error message returned because the offset of a subscription is reset.

MalformedRecordException

MalformedRecord,ShardNotReady

The error message returned because the record format is invalid. This may be caused because the schema is invalid, non-UTF-8 characters exist, or the client uses the protocol buffer (PB) protocol but the server does not support the PB protocol.

DatahubClientException

All other errors. This error type is the base class of all errors.

The error message returned because the error does not fall in the preceding error types. This type of error can be resolved by retries. However, the number of retries must be limited.

5.Methods

Manage projects

A project is a basic unit for managing data in DataHub. A project contains multiple topics. The projects in DataHub are independent of those in MaxCompute. You cannot reuse MaxCompute projects in DataHub. You must create projects in DataHub.

Create a project

Note

Syntax: CreateProjectResult createProject(String projectName, String comment)

When you create a project, you must set the project name and enter the project description. The project name must be 3 to 32 characters in length, and can contain letters, digits, and underscores (_). The project name must start with a letter and is not case-sensitive.

  • Parameters

    • projectName: the name of the project.

    • comment: the comments on the project.

  • Errors

    • DatahubClientException

  • Sample code

public static void createProject(String projectName,String projectComment) {
  try {
      datahubClient.createProject(projectName, projectComment);
      System.out.println("create project successful");
  } catch (DatahubClientException e) {
      System.out.println(e.getErrorMessage());
  }
}

Delete a project

Note

Syntax: DeleteProjectResult deleteProject(String projectName). Make sure that the project contains no topic before you delete the project. Parameters: projectName: the name of the project.

  • Errors

    • DatahubClientException

    • NoPermissionException: If the project contains a topic, this error is returned.

  • Sample code

public static void deleteProject(String projectName) {
  try {
      datahubClient.deleteProject(projectName);
      System.out.println("delete project successful");
  } catch (DatahubClientException e) {
      System.out.println(e.getErrorMessage());
  }
}

Update a project

Note

Syntax: UpdateProjectResult updateProject(String projectName, String comment). You can update only the comments on a project. Parameters: projectName: the name of the project. comment: the comments on the project.

  • Errors

    • DatahubClientException

  • Sample code

public static void updateProject(String projectName,String newComment) {
    try {
        datahubClient.updateProject(projectName, newComment);
        System.out.println("update project successful");
    } catch (DatahubClientException e) {
        System.out.println("other error");
    }
}

List projects

Note

Syntax: ListProjectResult listProject(). The return result of the listProject method is a ListProjectResult object, which contains a list of project names.

  • Parameters: none

  • Errors

    • DatahubClientException

  • Sample code

public static void listProject() {
    try {
        ListProjectResult listProjectResult = datahubClient.listProject();
        if (listProjectResult.getProjectNames().size() > 0) {
            for (String pName : listProjectResult.getProjectNames()) {
                System.out.println(pName);
            }
        }
    } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());
    }
}

Query a project

Note

Syntax: GetProjectResult getProject(String projectName). You can call the getProject method to view the attribute information of the current project. Parameters: projectName: the name of the project.

  • Errors

    • DatahubClientException

  • Sample code

public static void getProject(String projectName) {
    try {
        GetProjectResult getProjectResult = datahubClient.getProject(projectName );
        System.out.println(getProjectResult.getCreateTime() + "\t"
                + getProjectResult.getLastModifyTime() + "\t"
                + getProjectResult.getComment());
    } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());
    }
}

Manage topics

A topic is the smallest unit for data subscription and publishing in DataHub. You can use topics to distinguish different types of streaming data. Two types of topics are supported: tuple and blob.

  1. You can write a block of binary data as a record to blob topics.

  2. Tuple topics contain records that are similar to data records in databases. Each record contains multiple columns. You must specify record schemas for tuple topics because the data in tuple topics is transmitted as strings over the network. Therefore, schemas are required for data type conversion. The following table describes the data types that are supported.

Type

Description

Value range

BIGINT

An eight-byte signed integer.

-9223372036854775807 to 9223372036854775807.

DOUBLE

A double-precision floating-point number. It is eight bytes in length.

-1.0 _10^308 to 1.0 _10^308.

BOOLEAN

The Boolean type.

True and False, true and false, or 0 and 1.

TIMESTAMP

The type of timestamp.

A timestamp that is accurate to microseconds.

STRING

A string. Only UTF-8 encoding is supported.

The size of all values in a column of the STRING type cannot exceed 2 MB.

TINYINT

A single-byte integer.

-128 to 127.

SMALLINT

A double-byte integer.

-32768 to 32767.

INTEGER

A four-byte integer.

-2147483648 to 2147483647.

FLOAT

A four-byte single-precision floating-point number.

-3.40292347_10^38 to 3.40292347_10^38.

DataHub SDK for Java V2.16.1-public and later support TINYINT, SMALLINT, INTEGER, and FLOAT.

Create a tuple topic

Note

Syntax: CreateTopicResult createTopic(String projectName, String topicName, int shardCount, int lifeCycle, RecordType recordType, RecordSchema recordSchema, String comment)

  • Parameters

    • projectName: the name of the project in which you want to create the topic.

    • topicName: the name of the topic.

    • shardCount: the number of initial shards in the topic.

    • lifeCycle: the TTL of the data. Unit: days. The data that is written before that time is not accessible.

    • recordType: the type of record that you want to write. Valid values: TUPLE and BLOB.

    • recordSchema: the record schema for the topic.

    • comment: the comments on the topic.

  • Errors

    • DatahubClientException

  • Sample code

 public static void createTupleTopic(String projectName, String topicName, int shardCount, int lifeCycle,  String topicComment) {
   RecordSchema schema = new RecordSchema();
   schema.addField(new Field("bigint_field", FieldType.BIGINT));
   schema.addField(new Field("double_field", FieldType.DOUBLE));
   schema.addField(new Field("boolean_field", FieldType.BOOLEAN));
   schema.addField(new Field("timestamp_field", FieldType.TIMESTAMP));
   schema.addField(new Field("tinyint_field", FieldType.TINYINT));
   schema.addField(new Field("smallint_field", FieldType.SMALLINT));
   schema.addField(new Field("integer_field", FieldType.INTEGER));
   schema.addField(new Field("floar_field", FieldType.FLOAT));
   schema.addField(new Field("decimal_field", FieldType.DECIMAL));
   schema.addField(new Field("string_field", FieldType.STRING));
   try {
       datahubClient.createTopic(projectName,topicName, shardCount, lifeCycle, RecordType.TUPLE, schema, topicComment);
       System.out.println("create topic successful");
   } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());
   }
 }

Create a blob topic

Note

Syntax: CreateTopicResult createTopic(String projectName, String topicName, int shardCount, int lifeCycle, RecordType recordType, String comment)

  • Parameters

    • projectName: the name of the project in which you want to create the topic.

    • topicName: the name of the topic.

    • shardCount: the number of initial shards in the topic.

    • lifeCycle: the TTL of the data. Unit: days. The data that is written before that time is not accessible.

    • recordType: the type of record that you want to write. Valid values: TUPLE and BLOB.

    • comment: the comments on the topic.

  • Errors

    • DatahubClientException

    • InvalidParameterException

    • AuthorizationFailureException

    • ResourceNotFoundException

    • ResourceAlreadyExistException

  • Sample code

public static void createBlobTopic(String projectName, String topicName, int shardCount, int lifeCycle,  String topicComment) {
  try {
      datahubClient.createTopic(projectName, blobTopicName, shardCount, lifeCycle, RecordType.BLOB, topicComment);
      System.out.println("create topic successful");
  } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());
  }

Delete a topic

Make sure that the topic contains no subscription or DataConnector before you delete the topic. Otherwise, the NoPermission error is reported.

Note

Syntax: DeleteTopicResult deleteTopic(String projectName, String topicName)

  • Parameters

    • projectName: the name of the project in which you want to delete the topic.

    • topicName: the name of the topic.

  • Errors

    • DatahubClientException

    • NoPermissionException: If the topic contains a subscription or DataConnector, this error is returned.

  • Sample code

public static void deleteTopic(String projectName, String topicName) {
  try {
      datahubClient.deleteTopic(projectName, topicName);
      System.out.println("delete topic successful");
  } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());

  }
}

List topics

Note

Syntax: ListTopicResult listTopic(String projectName)

  • Parameters

    • projectName: the name of the project in which you want to list projects.

  • Sample code

   public static void listTopic(String projectName ) {
      try {
          ListTopicResult listTopicResult = datahubClient.listTopic(projectName);
          if (listTopicResult.getTopicNames().size() > 0) {
              for (String tName : listTopicResult.getTopicNames()) {
                  System.out.println(tName);
              }
          }
      } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());          
      }
  }

Update a topic

You can update the comments on and TTL of a topic.

Note

Syntax: UpdateTopicResult updateTopic(String projectName, String topicName, int lifeCycle, String comment)

  • Parameters

    • projectName: the name of the project in which you want to update the topic.

    • topicName: the name of the topic.

    • comment: the comments to be updated.

    • lifeCycle: the TTL of the topic.

  • Errors

    • DatahubClientException

  • Sample code

   public static void updateTopic(String projectName, String topicName, int lifeCycle, String comment) {
        try {
            comment = "new topic comment";
             lifeCycle = 1;
            datahubClient.updateTopic(projectName, Constant.topicName,lifeCycle, comment);
            System.out.println("update topic successful");
            // View the updated results.
            GetTopicResult getTopicResult = datahubClient.getTopic(projectName, topicName);
            System.out.println(getTopicResult.getComment());
        } catch (DatahubClientException e) {
            System.out.println(e.getErrorMessage());
        }
    }

Query a topic

Note

Syntax: GetTopicResult getTopic(String projectName, String topicName). You can call the getTopic method to obtain the attribute information about a topic.

  • Parameters

    • projectName: the name of the project in which you want to query the topic.

    • topicName :the name of the topic.

  • Errors

    • DatahubClientException

  • Sample code

   public static void getTopic(String projectName, String topicName) {
        try {
            GetTopicResult getTopicResult = datahubClient.getTopic(projectName, topicName);
            System.out.println(getTopicResult.getShardCount() + "\t"
                    + getTopicResult.getLifeCycle() + "\t"
                    + getTopicResult.getRecordType() + "\t"
                    + getTopicResult.getComment());
        } catch (DatahubClientException e) {
            System.out.println(e.getErrorMessage());

        }
    }

Add fields to a tuple topic

You can add a single field or add multiple fields at a time.

Note

Syntax: AppendFieldResult appendField(String projectName, String topicName, Field field)

  • Parameters

    • projectName: the name of the project where the topic to which you want to add fields resides.

    • topicName: the name of the topic.

    • fields: the field to be added. All fields can be set to null.

  • Errors

    • DatahubClientException

  • Sample code

public static void appendNewField(String projectName,String topicName) {
    try {
        Field newField = new Field("newField", FieldType.STRING, true,"comment");
        datahubClient.appendField(projectName, topicName, newField);
        System.out.println("append field successful");
    } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());

    }
}
Note

AppendFieldResult appendField(String projectName, String topicName, List fields);

  • Parameters

    • projectName: the name of the project where the topic to which you want to add fields resides.

    • topicName:the name of the topic.

    • fields: the fields to be added. All fields can be set to null.

  • Errors

    • DatahubClientException

  • Sample code

    public static void appendNewField(String projectName,String topicName) {
        try {
            List<Field>  list = new ArrayList<>();
            Field newField1 = new Field("newField1", FieldType.STRING, true,"comment");
            list.add(newField1);
            datahubClient.appendField(projectName, topicName, list);
            System.out.println("append field successful");
        } catch (DatahubClientException e) {
            System.out.println(e.getErrorMessage());

        }
    }
Note

Manage shards

Shards are concurrent tunnels used for data transmission in a topic. Each shard has an ID. A shard can be in different states. Opening: The shard is being started. Active: The shard is started and can be used to provide services. Each active shard consumes server resources. We recommend that you create shards as needed.

List shards

Note

Syntax: ListShardResult listShard(String projectName, String topicName)

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

  • Errors

    • DatahubClientException

  • Sample code

public static void listShard(String projectName, String topicName) {
  try {
      ListShardResult listShardResult = datahubClient.listShard(projectName, topicName);
      if (listShardResult.getShards().size() > 0) {
          for (ShardEntry entry : listShardResult.getShards()) {
              System.out.println(entry.getShardId() + "\t"
                      + entry.getState() + "\t"
                      + entry.getLeftShardId() + "\t"
                      + entry.getRightShardId());
          }
      }
  } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());

  }
}

Split a shard

You can split an active shard of a specified topic. After a shard is split, two active new shards are generated, whereas the original shard is closed. You can only read data from, but not write data to, a closed shard. You can use the default split key or specify a split key to split a shard.

Note

Syntax: SplitShardResult splitShard(String projectName, String topicName, String shardId), or SplitShardResult splitShard(String projectName, String topicName, String shardId, String splitKey)

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • shardId: the ID of the shard to be split.

    • splitKey: the split key that is used to split the shard.

  • Errors

    • DatahubClientException

  • Sample code

public static void splitShard(String projectName, String topicName, String shardId) {
    try {
        shardId = "0";
        SplitShardResult splitShardResult = datahubClient.splitShard(projectName, topicName, shardId);
        for (ShardEntry entry : splitShardResult.getNewShards()) {
            System.out.println(entry.getShardId());
        }
    }  catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());

    }
}

Merge shards

The two active shards to be merged in a topic must be adjacent to each other. For more information about the two adjacent shards of a shard, see the result that is returned by the listShard method.

Note

Syntax: MergeShardResult mergeShard(String projectName, String topicName, String shardId, String adjacentShardId)

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • shardId: the ID of the shard to be merged.

    • adjacentShardId: the ID of the shard that is adjacent to the specified shard.

  • Errors

    • DatahubClientException

  • Sample code

public static void mergeShard() {
    try {
        String shardId = "7";
        // The values of the adjacentShardId and shardId parameters must be adjacent. For more information about the adjacent shards of a shard, see the result that is returned by the listShard method.
        String adjacentShardId = "8";
        MergeShardResult mergeShardResult = datahubClient.mergeShard(Constant.projectName, Constant.topicName, shardId, adjacentShardId);
        System.out.println("merge successful");
        System.out.println(mergeShardResult.getShardId());
    } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());

    }
}

Extend shards

The number of shards to be extended must be greater than or equal to the original shard quantity.

Note

Syntax: ExtendShardResult extendShard(String projectName, String topicName, int shardCount)

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • shardCount: the number of shards to be extended.

    • adjacentShardId: the ID of the shard that is adjacent to the specified shard.

  • Errors

    • DatahubClientException

  • Sample code

        public static void extendTopic(String projectName, String topicName, int shardCount) {
            try {
                ExtendShardResult extendShardResult = datahubClient.extendShard(projectName, topicName, shardCount);
    
            } catch (DatahubClientException e) {
                System.out.println(e.getErrorMessage());
            }
    
        }

Read and write data

You can read data from active and closed shards. However, you can write data only to active shards.

Read data

To read data, you must first obtain a cursor and then pass in the cursor value to the getRecords method. Alternatively, you can use the subscription feature of DataHub to directly associate a subscription to consume data. In this case, the server automatically saves consumption offsets. If you want to sample data to view data quality, you can read data.

Obtain a cursor

To read data from a topic, specify a shard and the cursor from which data starts to be read. You can obtain the cursor by using the following methods: OLDEST, LATEST, SEQUENCE, and SYSTEM_TIME.

  • OLDEST: the cursor that points to the earliest valid record in the specified shard.

  • LATEST: the cursor that points to the latest record in the specified shard.

  • SEQUENCE: the cursor that points to the record of the specified sequence number.

  • SYSTEM_TIME: the cursor that points to the first record whose timestamp value is greater than or equal to the specified timestamp value.

Select a method to obtain the cursor

The data to be read must be valid, which means that the data must be within the TTL. Otherwise, an error is reported.

  • Scenario 1: Read data from the beginning of a shard. In this case, we recommend that you use the OLDEST method. If all the data in the shard is valid, data starts to be read from the first record.

  • Scenario 2: Sample data to check whether the data whose timestamp value is greater than the specified timestamp value is valid. In this case, we recommend that you use the SYSTEM_TIME method. Data starts to be read from the first record that is subsequent to the record at the specified timestamp.

  • Scenario 3: View the latest data information. In this case, we recommend that you use the LATEST method. You can use this method to read the latest record or the latest N records. To obtain the latest N records, you must first obtain the sequence number of the latest record. Then, identify the previous N records of the latest record. The first sequence number of the previous N records is the sequence number of the latest record minus N.

Note

Syntax: GetCursorResult getCursor(String projectName, String topicName, String shardId, CursorType type), or GetCursorResult getCursor(String projectName, String topicName, String shardId, CursorType type, long param)

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • shardId: the ID of the shard.

    • CursorType: the type of cursor.

  • Errors

    • DatahubClientException

    • SeekOutOfRangeException

  • Sample code

    • If you want to sample data, convert the time to a timestamp. Then, obtain the cursor.

public static void getcursor(String projectName,String topicName) {
    String shardId = "5";
    try {
        // Convert the time to a timestamp.
        String time = "2019-07-01 10:00:00";
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        long timestamp = 0L;
        try {
            Date date = simpleDateFormat.parse(time);
            timestamp = date.getTime(); // Obtain the timestamp that corresponds to the time.
            //System.out.println(timestamp);
        } 
        // Obtain the cursor from which data starts to be read after the timestamp.
        String timeCursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.SYSTEM_TIME, timestamp).getCursor();
        System.out.println("get cursor successful");
    } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());  
    } catch (ParseException e) {
            System.out.println(e.getErrorOffset());
        }
}
  • Read data from the earliest record in the specified shard.

public static void getcursor(String projectName,String topicName) {
    String shardId = "5";
    try {
        /* Use the OLDEST method. */
        String oldestCursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.OLDEST).getCursor();
        System.out.println("get cursor successful");
    } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());

    }
}
  • Read the latest data that is written to the specified shard, which involves the following two scenarios:

    • Read the latest record that is written to the specified shard.

    • Read the latest N records that are written to the specified shard.

      • You must first obtain the sequence number of the latest record and then obtain the cursor.

public static void getcursor(String projectName,String topicName) {
    String shardId = "5";
    try {
        /* Use the LATEST method. */
        String latestCursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.LATEST).getCursor();
        /* Use the SEQUENCE method. */
        // Obtain the sequence number of the latest record.
        long seq = datahubClient.getCursor(projectName, topicName, shardId, CursorType.LATEST).getSequence();
        // Obtain the cursor for reading the latest 10 records.
        String seqCursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.SEQUENCE, seq - 9).getCursor();
        }
         catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());

    }

}

Data reading method

Note

Syntax: GetRecordsResult getRecords(String projectName, String topicName, String shardId, String cursor, int limit), or GetRecordsResult getRecords(String projectName, String topicName, String shardId, RecordSchema schema, String cursor, int limit)

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • shardId: the ID of the shard.

    • schema: the schema that is required when you read records from a tuple topic.

    • cursor: the cursor from which data starts to be read.

    • limit: the maximum number of records to be read.

  • Errors

    • DatahubClientException

  • Sample code

Read records from a tuple topic

 public static void example(String projectName,String topicName) {
    // The maximum number of records to be read each time.
     int recordLimit = 1000;
     String shardId = "7";
     // Obtain the cursor of the earliest valid record.
     // Note: In general, you call the getCursor method only during initialization. After that, you can call the getNextCursor method to continue to consume data.
     String cursor = "";
     try {
         cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.OLDEST).getCursor();
     }  catch (DatahubClientException e) {
            System.out.println(e.getErrorMessage());         
     }
     while (true) {
         try {
             GetRecordsResult result = datahubClient.getRecords(projectName, topicName, shardId, recordSchema, cursor, recordLimit);
             if (result.getRecordCount() <= 0) {
                // If no record can be read, pause the thread for 10,000 ms and continue to read records.
                 Thread.sleep(10000);
                 continue;
             }
             for (RecordEntry entry : result.getRecords()) {
                 TupleRecordData data = (TupleRecordData) entry.getRecordData();
                 System.out.println("field1:" + data.getField("field1") + "\t"
                         + "field2:" + data.getField("field2"));
             }
             // Obtain the next cursor.
             cursor = result.getNextCursor();
         } catch (InvalidCursorException ex) {
             // The cursor is invalid or has expired. Specify another cursor to start consumption.
             cursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.OLDEST).getCursor();
         }  catch (DatahubClientException e) {
            System.out.println(e.getErrorMessage());;

         } 
     }
 }

Read records from a blob topic

public static void example(String projectName,String topicName) {
    // The maximum number of records to be read each time.
    int recordLimit = 1000;
    String shardId = "7";
    // Obtain the cursor of the earliest valid record.
    // Note: In general, you call the getCursor method only during initialization. After that, you can call the getNextCursor method to continue to consume data.
    String cursor = "";
    try {
        cursor = datahubClient.getCursor(projectName, blobTopicName, shardId, CursorType.OLDEST).getCursor();
    } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());     
    }
    while (true) {
        try {
            GetRecordsResult result = datahubClient.getRecords(projectName, blobTopicName, shardId, recordSchema, cursor, recordLimit);
            if (result.getRecordCount() <= 0) {
                // If no record can be read, pause the thread for 10,000 ms and continue to read records.
                Thread.sleep(10000);
                continue;
            }
            /* Consume data. */
            for (RecordEntry record: result.getRecords()){
                 BlobRecordData data = (BlobRecordData) record.getRecordData();
                 System.out.println(new String(data.getData()));
            }
            // Obtain the next cursor.
            cursor = result.getNextCursor();
        } catch (InvalidCursorException ex) {
            // The cursor is invalid or has expired. Specify another cursor to start consumption.
            cursor = datahubClient.getCursor(Constant.projectName, Constant.blobTopicName, shardId, CursorType.OLDEST).getCursor();
        } catch (DatahubClientException e) {
            System.out.println(e.getErrorMessage()); 
        } 
    }
}

Write data

In DataHub SDK for Java V2.12 and later, the server supports the PutRecordsByShardResult method. In versions earlier than V2.12, the server supports the putRecords method. To call the putRecordsByShard method, you must specify the shard to which you want to write data. Otherwise, data is written to the first active shard by default. The input parameters of the preceding two methods are a list of records of the same type, such as the tuple or blob type. DataHub SDK for Java allows you to write data by shard by calling the putRecordsByShard method or write data in hybrid mode by calling the putRecords method. You can write data to DataHub by shard in DataHub SDK for Java V2.12 and later. If you call the putRecords method to write data to DataHub, you must check the return result to determine whether data is written to DataHub. If you call the putRecordsByShard method to write data to DataHub but data fails to be written, an error is reported. If your server supports the putRecordsByShard method, we recommend that you use the putRecordsByShard method.

Note

Syntax: PutRecordsResult putRecords(String projectName, String topicName, List records), or PutRecordsByShardResult putRecordsByShard(String projectName, String topicName, String shardId, List records)

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • shardId: the ID of the shard.

    • records: the list of records to be written to DataHub.

  • Errors

    • DatahubClientException

Write records to a tuple topic

// Write tuple records.
    public static void tupleExample(String project,String topic,int retryTimes) {
        // Obtain the schema.
        RecordSchema recordSchema = datahubClient.getTopic(project,topic ).getRecordSchema();
        // Generate 10 records.
        List<RecordEntry> recordEntries = new ArrayList<>();
        for (int i = 0; i < 10; ++i) {
            RecordEntry recordEntry = new RecordEntry();
            // You can specify additional attributes, such as the IP address and machine name of the server, for each record. If you do not specify additional attributes, data writing is not affected.
            recordEntry.addAttribute("key1", "value1");
            TupleRecordData data = new TupleRecordData(recordSchema);
            data.setField("field1", "HelloWorld");
            data.setField("field2", 1234567);
            recordEntry.setRecordData(data);
            recordEntries.add(recordEntry);
        }
        try {
            PutRecordsResult result = datahubClient.putRecords(project, topic, recordEntries);
            int i = result.getFailedRecordCount();
            if (i > 0) {
                retry(datahubClient, result.getFailedRecords(), retryTimes, project, topic);
            }
        }  catch (DatahubClientException e) {
            System.out.println("requestId:" + e.getRequestId() + "\tmessage:" + e.getErrorMessage());
        }
    }
    // The retry mechanism.
    public static void retry(DatahubClient client, List<RecordEntry> records, int retryTimes, String project, String topic) {
        boolean suc = false;
        while (retryTimes != 0) {
            retryTimes = retryTimes - 1;
            PutRecordsResult recordsResult = client.putRecords(project, topic, records);
            if (recordsResult.getFailedRecordCount() > 0) {
                retry(client,recordsResult.getFailedRecords(),retryTimes,project,topic);
            }
            suc = true;
            break;
        }
        if (!suc) {
            System.out.println("retryFailure");
        }
    }
'''Java
<br />
<br />** Write records to a blob topic**<br />

'''Java
// Write blob records.
public static void blobExample() {
    // Generate 10 records.
    List<RecordEntry> recordEntries = new ArrayList<>();
    String shardId = "4";
    for (int i = 0; i < 10; ++i) {
        RecordEntry recordEntry = new RecordEntry();
        // Specify additional attributes for each record.
        recordEntry.addAttribute("key1", "value1");
        BlobRecordData data = new BlobRecordData("123456".getBytes(Charsets.UTF_8));
        recordEntry.setRecordData(data);
        recordEntry.setShardId(shardId);
        recordEntries.add(recordEntry);
        recordEntry.setShardId("0");
    }
    while (true) {
        try {
            // In DataHub SDK for Java V2.12 and later, the server supports the PutRecordsByShardResult method. If your SDK version is earlier than V2.12, use the putRecords method.
            //datahubClient.putRecordsByShard(Constant.projectName, Constant.topicName, shardId, recordEntries);
            datahubClient.putRecords(Constant.projectName, Constant.topicName, recordEntries);
            System.out.println("write data  successful");
            break;
        } catch (DatahubClientException e) {
            System.out.println(e.getErrorMessage());            
        }
    }
}

Write records in various modes

If your SDK version is earlier than V2.12, you can write records only by calling the putRecords method. The RecordEntry class contains the following three attributes: shardId, partitionKey, and hashKey. You can specify the values of the preceding attributes to determine the shard to which records are written.

Note

In DataHub SDK for Java V2.12 and later, we recommend that you call the putRecordsByShard method to write records. This prevents the performance loss that is caused by repartitioning on the server.

Write records by shard ID. This mode is recommended. Sample code:

RecordEntry entry = new RecordEntry();
entry.setShardId("0");

Write records by hash key. In this mode, specify a 128-bit message-digest algorithm 5 (MD5) value. If you write records by hash key, the values of the BeginHashKey and EndHashKey parameters are used to determine the shard to which records are written. Sample code:

RecordEntry entry = new RecordEntry();
entry.setHashKey("7FFFFFFFFFFFFFFD7FFFFFFFFFFFFFFD");

Write records by partition key. In this mode, specify a string as the partition key. Then, the shard to which records are written is determined based on the MD5 value of the string and the values of the BeginHashKey and EndHashKey parameters. Sample code:

RecordEntry entry = new RecordEntry();
entry.setPartitionKey("TestPartitionKey");

Query metering information

Query metering information

Note

Syntax: GetMeterInfoResult getMeterInfo(String projectName, String topicName, String shardId)

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • shardId: the ID of the shard.

  • Errors

    • DatahubClientException

    • InvalidParameterException

    • AuthorizationFailureException

    • ResourceNotFoundException

  • Sample code

public static void getMeter(String projectName,String topicName) {
  String shardId = "5";
  try {
      GetMeterInfoResult getMeterInfoResult = datahubClient.getMeterInfo(projectName, topicName, shardId);
      System.out.println("get meter successful");
      System.out.println(getMeterInfoResult.getActiveTime() + "\t" + getMeterInfoResult.getStorage());
  } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());      
  }
}

Manage subscriptions

DataHub allows the server to save the consumption offsets of a subscription. You can obtain highly available offset storage services by performing simple configurations.

Create a subscription

Note

Syntax: CreateSubscriptionResult createSubscription(String projectName, String topicName, String comment)

The comments on a subscription are in the following format: {"application":"Application","description":"Description"}.

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • comment: the comments on the subscription.

  • Errors

    • DatahubClientException

    • InvalidParameterException

    • AuthorizationFailureException

    • ResourceNotFoundException

  • Sample code

public static void createSubscription(String projectName,String topicName) {
  try {
      CreateSubscriptionResult createSubscriptionResult = datahubClient.createSubscription(projectName, topicName, Constant.subscribtionComment);
      System.out.println("create subscription successful");
      System.out.println(createSubscriptionResult.getSubId());
  } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());      
  }
}

Delete a subscription

Note

Syntax: DeleteSubscriptionResult deleteSubscription(String projectName, String topicName, String subId)

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • subId: the ID of the subscription.

  • Errors

    • DatahubClientException

    • InvalidParameterException

    • AuthorizationFailureException

    • ResourceNotFoundException

  • Sample code

public static void deleteSubscription(String projectName,String topicName,String subId) {
  try {
      datahubClient.deleteSubscription(projectName, topicName, subId);
      System.out.println("delete subscription successful");
  } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());      
  }
}

Update a subscription

You can update only the comments on an existing subscription.

Note

Syntax: UpdateSubscriptionResult updateSubscription(String projectName, String topicName, String subId, String comment)

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • subId: the ID of the subscription.

    • comment: the comments to be updated.

  • Errors

    • DatahubClientException

    • InvalidParameterException

    • AuthorizationFailureException

    • ResourceNotFoundException

  • Sample code

public static void updateSubscription(String projectName, String topicName, String subId, String comment){
        try {
            datahubClient.updateSubscription(projectName,topicName,subId,comment)
        } catch (DatahubClientException e) {
            System.out.println(e.getErrorMessage());

        }

    }

List subscriptions

The pageNum and pageSize parameters of the listSubscription method specify the range of subscriptions to be listed. For example, you can set the pageNum and pageSize parameters to 1 and 10 to list the first 10 subscriptions. For another example, you can set the pageNum and pageSize parameters to 2 and 5 to list the sixth to tenth subscriptions.

Note

Syntax: ListSubscriptionResult listSubscription(String projectName, String topicName, int pageNum, int pageSize)

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • pageNum: the number of the page to return.

    • pageSize: the number of entries to return on each page.

  • Errors

    • DatahubClientException

    • InvalidParameterException

    • AuthorizationFailureException

    • ResourceNotFoundException

  • Sample code

  • Sample code

public static void listSubscription(String projectName, String topicName, int pageNum, int pageSize) {
    try {
        ListSubscriptionResult listSubscriptionResult = datahubClient.listSubscription(projectName, topicName, pageNum, pageSize);
        if (listSubscriptionResult.getSubscriptions().size() > 0) {
            System.out.println(listSubscriptionResult.getTotalCount());
            System.out.println(listSubscriptionResult.getSubscriptions().size());
            for (SubscriptionEntry entry : listSubscriptionResult.getSubscriptions()) {
                System.out.println(entry.getSubId() + "\t"
                        + entry.getState() + "\t"
                        + entry.getType() + "\t"
                        + entry.getComment());
            }
        }
    } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());

    }
}

Query a subscription

Note

Syntax: GetSubscriptionResult getSubscription(String projectName, String topicName, String subId)

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • subId: the ID of the subscription.

  • Errors

    • DatahubClientException

    • InvalidParameterException

    • AuthorizationFailureException

    • ResourceNotFoundException

public static void getSubscription(String projectName, String topicName, String subId) {
    try {
        GetSubscriptionResult getSubscriptionResult = datahubClient.getSubscription(projectName, topicName, subId);
        System.out.println(getSubscriptionResult.getSubId() + "\t"
                + getSubscriptionResult.getState() + "\t"
                + getSubscriptionResult.getType() + "\t"
                + getSubscriptionResult.getComment());
    }  catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());

    }
}

Update the status of a subscription

A subscription can be in the OFFLINE or ONLINE state, which indicates an offline or online subscription.

Note

Syntax: UpdateSubscriptionStateResult updateSubscriptionState(String projectName, String topicName, String subId, SubscriptionState state)

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • subId: the ID of the subscription.

    • state: the state to be updated.

  • Errors

    • DatahubClientException

    • InvalidParameterException

    • AuthorizationFailureException

    • ResourceNotFoundException

  • Sample code

public static void updateSubscriptionState(String projectName, String topicName,String subId) {
    try {
        datahubClient.updateSubscriptionState(projectName, topicName, subId, SubscriptionState.ONLINE);
        System.out.println("update subscription state successful");
    } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());        
    }
}

Manage offsets

After a subscription is created, it is initially unconsumed. To use the offset storage feature of the subscription, perform the following operations on offsets:

Initialize an offset

To initialize an offset, you need to call the openSubscriptionSession method only once. The second time you call this method, a new consumption session ID is generated. In this case, the previous session becomes invalid, and you cannot submit offsets.

Note

Syntax: OpenSubscriptionSessionResult openSubscriptionSession(String projectName, String topicName, String subId, List shardIds)

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • subId: the ID of the subscription.

    • shardIds: the IDs of shards.

  • Errors

    • DatahubClientException

    • InvalidParameterException

    • AuthorizationFailureException

    • ResourceNotFoundException

  • Sample code

public static void openSubscriptionSession(String projectName, String topicName) {
    shardId = "4";
    shardIds = new ArrayList<String>();
    shardIds.add("0");
    shardIds.add("4");
    try {
        OpenSubscriptionSessionResult openSubscriptionSessionResult = datahubClient.openSubscriptionSession(projectName, topicName, subId, shardIds);
        SubscriptionOffset subscriptionOffset = openSubscriptionSessionResult.getOffsets().get(shardId);
        System.out.println(subscriptionOffset.getSessionId() + "\t"
                + subscriptionOffset.getVersionId() + "\t"
                + subscriptionOffset.getSequence());
    }  catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());      
    }
}

Obtain an offset

Note

Syntax: GetSubscriptionOffsetResult getSubscriptionOffset(String projectName, String topicName, String subId, List shardIds)

The return result of the getSubscriptionOffset method is a GetSubscriptionOffsetResult object, which is basically the same as the return result of the openSubscriptionSession method. However, the GetSubscriptionOffsetResult object does not contain the session ID of the offset. You can call the getSubscriptionOffset method only to view the information about the offset.

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • subId: the ID of the subscription.

    • shardIds: the IDs of shards.

  • Errors

    • DatahubClientException

    • InvalidParameterException

    • AuthorizationFailureException

    • ResourceNotFoundException

  • Sample code

// Obtain an offset.
public static void getSubscriptionOffset(String projectName, String topicName,String subId) {
    shardId = "4";
    shardIds = new ArrayList<String>();
    shardIds.add("0");
    shardIds.add("4");
    try {
        GetSubscriptionOffsetResult getSubscriptionOffsetResult = datahubClient.getSubscriptionOffset(projectName, topicName, subId, shardIds);
        SubscriptionOffset subscriptionOffset = getSubscriptionOffsetResult.getOffsets().get(shardId);
        System.out.println(subscriptionOffset.getVersionId() + "\t"
                + subscriptionOffset.getSequence());
    } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());

    }
}

Submit an offset

Note

Syntax: CommitSubscriptionOffsetResult commitSubscriptionOffset(String projectName, String topicName, String subId, Map offsets)

When you submit an offset, DataHub verifies the values of the versionId and sessionId parameters. Make sure that the values are the same as those in the current session. The offset information to be submitted is not limited. We recommend that you enter the actual sequence number and timestamp of the record.

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • subId: the ID of the subscription.

    • offsets: the offset map of shards.

  • Errors

    • DatahubClientException

    • InvalidParameterException

    • AuthorizationFailureException

    • ResourceNotFoundException

    • SubscriptionOffsetResetException

    • SubscriptionSessionInvalidException

    • SubscriptionOfflineException

  • Sample code

// Submit an offset.
public static void commitSubscriptionOffset(String projectName, String topicName,String subId) {
  while (true) {
      try {
          OpenSubscriptionSessionResult openSubscriptionSessionResult = datahubClient.openSubscriptionSession(projectName, topicName, subId, shardIds);
          SubscriptionOffset subscriptionOffset = openSubscriptionSessionResult.getOffsets().get(shardId);
          // The sample code is used only for testing. For the complete code, see the sample code that provides an example on how to consume data from a saved consumption offset and submit offsets during consumption.
          subscriptionOffset.setSequence(10);
          subscriptionOffset.setTimestamp(100);
          Map<String, SubscriptionOffset> offsets = new HashMap<String, SubscriptionOffset>();
          offsets.put(shardId, subscriptionOffset);
          // Submit the offset.
          datahubClient.commitSubscriptionOffset(Constant.projectName, Constant.topicName, subId, offsets);
      } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());


      }
  }
}

Reset an offset

Note

Syntax: ResetSubscriptionOffsetResult resetSubscriptionOffset(String projectName, String topicName, String shardId, Map offsets)

You can reset an offset to a specific point in time. If multiple records are involved at this point in time, the reset offset points to the first record that is involved at this point in time. After an offset is reset, the offset information changes, and the version ID is updated. If a task that is running submits offsets by using the previous version ID, the SubscriptionOffsetResetException error is reported. You can call the getSubscriptionOffset method to obtain a new version ID.

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • subId: the ID of the subscription.

    • offsets: the offset map of shards.

  • Errors

    • DatahubClientException

    • InvalidParameterException

    • AuthorizationFailureException

    • ResourceNotFoundException

  • Sample code

// Reset an offset.
public static void resetSubscriptionOffset(String projectName, String topicName) throws ParseException {
    List<String> shardIds = Arrays.asList("0");
    // Specify the time to which you want to reset the offset and convert the time to a timestamp.
    String time = "2019-07-09 10:00:00";
    SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    Date date = simpleDateFormat.parse(time);

    long timestamp = date.getTime(); // Obtain the timestamp that corresponds to the time.
    long sequence = client.getCursor(projectName, topicName, subId, CursorType.SYSTEM_TIME, timestamp).getSequence();
    SubscriptionOffset offset = new SubscriptionOffset();
    offset.setTimestamp(timestamp);
    offset.setSequence(sequence);
    Map<String, SubscriptionOffset> offsets = new HashMap<String, SubscriptionOffset>();
    for (String shardId : shardIds) {
        offsets.put(shardId, offset);
    }

    try {
        datahubClient.resetSubscriptionOffset(projectName, topicName, subId, offsets);
        System.out.println("reset successful");
    } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());
    }
}

Associate a subscription to consume data in DataHub

Similar to reading data from DataHub, you can associate a subscription to consume data in DataHub. A subscription saves consumption offsets. You can select consumption offsets as needed.

  • Usage notes:

    1. Call the openSubscriptionSession method to initialize an offset and obtain the version ID and session ID of this subscription. You can call this method only once to initialize an offset. If you call this method for more than once, the previous session becomes invalid. In this case, you cannot submit offsets.

    2. Call the getCursor method to obtain the offset of a record in a subscription to consume data. After you consumes the first record, call the getNextCursor method to obtain the offset of a next record and continue to consume data.

    3. Call the commitSubscriptionOffset method to submit an offset. When you submit an offset, the version ID and session ID of this subscription need to be verified. Therefore, make sure that the version ID and session ID are the same as those in the current session.

// The following sample code provides an example on how to consume data from a saved offset and submit offsets during consumption.
public static void example(String projectName, String topicName,String subId) {
  String shardId = "0";
  List<String> shardIds = Arrays.asList("0", "1");
  OpenSubscriptionSessionResult openSubscriptionSessionResult = datahubClient.openSubscriptionSession(projectName, topicName, subId, shardIds);
  SubscriptionOffset subscriptionOffset = openSubscriptionSessionResult.getOffsets().get(shardId);
  // 1. Obtain the cursor of the record at the current offset. If the record expires or is not consumed, obtain the cursor of the first record within the TTL of the topic.
  String cursor = null;
  // If the sequence number is smaller than 0, the record is not consumed.
  if (subscriptionOffset.getSequence() < 0) {
      // Obtain the cursor of the first record within the TTL of the topic.
      cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.OLDEST).getCursor();
  } else {
      // Obtain the cursor of the next record.
      long nextSequence = subscriptionOffset.getSequence() + 1;
      try {
          // If the SeekOutOfRange error is returned after you obtain the cursor based on the sequence number, the record of the current cursor expires.
          cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.SEQUENCE, nextSequence).getCursor();
      } catch (SeekOutOfRangeException e) {
          // Obtain the cursor of the first record within the TTL of the topic.
          cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.OLDEST).getCursor();
      }
  }
  // 2. Read records and save offsets. For example, read tuple records and save an offset each time 1,000 records are read.
  long recordCount = 0L;
  // Read 10 records each time.
  int fetchNum = 10;
  while (true) {
      try {
          GetRecordsResult getRecordsResult = datahubClient.getRecords(projectName, topicName, shardId, schema, cursor, fetchNum);
          if (getRecordsResult.getRecordCount() <= 0) {
              // If no record can be read, pause the thread for 1,000 ms and continue to read records.
              Thread.sleep(1000);
              continue;
          }
          for (RecordEntry recordEntry : getRecordsResult.getRecords()) {
              // Consume data.
              TupleRecordData data = (TupleRecordData) recordEntry.getRecordData();
              System.out.println("field1:" + data.getField("field1") + "\t"
                      + "field2:" + data.getField("field2"));
              // Save the offset after the data is consumed.
              ++recordCount;
              subscriptionOffset.setSequence(recordEntry.getSequence());
              subscriptionOffset.setTimestamp(recordEntry.getSystemTime());
              if (recordCount % 1000 == 0) {
                  // Submit the offset.
                  Map<String, SubscriptionOffset> offsetMap = new HashMap<>();
                  offsetMap.put(shardId, subscriptionOffset);
                  datahubClient.commitSubscriptionOffset(projectName, topicName, subId, offsetMap);
                  System.out.println("commit offset successful");
              }
          }
          cursor = getRecordsResult.getNextCursor();
      } catch (SubscriptionOfflineException | OffsetSessionChangedException e) {
          // The subscription session is exited. Offline: The subscription is offline. SessionChange: The subscription is also used on other clients.
          break;
      } catch (OffsetResetedException e) {
          // The offset is reset. You must obtain the offset information of the subscription again. In this example, the sequence number is rest.
          // If the timestamp is rest, you must use the CursorType.SYSTEM_TIME parameter to obtain the cursor.
          subscriptionOffset = datahubClient.getSubscriptionOffset(projectName, topicName, subId, shardIds).getOffsets().get(shardId);
          long nextSequence = subscriptionOffset.getSequence() + 1;
          cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.SEQUENCE, nextSequence).getCursor();
      } catch (DatahubClientException e) {
          // TODO: Specify whether to exit when an error occurs.
      } catch (Exception e) {
          break;
      }
  }
}

Manage DataConnectors

A DataConnector in DataHub synchronizes streaming data from DataHub to other cloud services. You can use DataConnectors to synchronize data from DataHub topics to MaxCompute, Object Storage Service (OSS), ApsaraDB RDS for MySQL, Tablestore, Elasticsearch, and Function Compute in real-time or near real-time mode. After DataConnectors are configured, the data you write to DataHub can be used in other Alibaba Cloud services.

Create a DataConnector

Note

Syntax: CreateConnectorResult createConnector(String projectName, String topicName, ConnectorType connectorType, List columnFields, SinkConfig config), or CreateConnectorResult createConnector(String projectName, String topicName, ConnectorType connectorType, long sinkStartTime, List columnFields, SinkConfig config)

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • ConnectorType: the type of DataConnector that you want to create.

    • columnFields: the fields that you want to synchronize.

    • sinkStartTime: the time at which data starts to be synchronized to DataHub. Unit: milliseconds.

    • config: the configuration details of the specific type of DataConnector.

  • Errors

    • DatahubClientException

    • InvalidParameterException

    • AuthorizationFailureException

    • ResourceNotFoundException

  • The following sample code provides an example on how to create a DataConnector to synchronize data from DataHub to MaxCompute:

public static void createConnector(String projectName,String topicName) {
    List<String> columnFields = Arrays.asList("field1", "field2");
    SinkOdpsConfig config = new SinkOdpsConfig() {{
        setEndpoint(Constant.odps_endpoint);
        setProject(Constant.odps_project);
        setTable(Constant.odps_table);
        setAccessId(Constant.odps_accessId);
        setAccessKey(Constant.odps_accessKey);
        setPartitionMode(PartitionMode.SYSTEM_TIME);
        setTimeRange(60);
    }};
    // Specify the partition format.
    SinkOdpsConfig.PartitionConfig partitionConfig = new SinkOdpsConfig.PartitionConfig() {{
        addConfig("ds", "%Y%m%d");
        addConfig("hh", "%H");
        addConfig("mm", "%M");
    }};
    config.setPartitionConfig(partitionConfig);
    try {
        // Create a DataConnector.
        datahubClient.createConnector(projectName, topicName, ConnectorType.SINK_ODPS, columnFields, config);
        System.out.println("create  connector successful");
    } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());        
    }
}
  • The following sample code provides an example on how to create a DataConnector to synchronize data from DataHub to OSS:

    public static void createOssConnector(String projectName,String topicName) {
        List<String> columnFields = Arrays.asList("field1", "field2");
        SinkOssConfig config = new SinkOssConfig() {{
            setAccessId(Constant.oss_accessId);
            setAccessKey(Constant.oss_accessKey);
            setAuthMode(AuthMode.STS);
            setBucket(Constant.oss_bucket);
            setEndpoint(Constant.oss_endpoint);
            setPrefix(Constant.oss_prefix);
            setTimeFormat(Constant.oss_timeFormat);
            setTimeRange(60);

        }};

        try {
            // Create a DataConnector.
            datahubClient.createConnector(projectName,topicName, ConnectorType.SINK_OSS, columnFields, config);
            System.out.println("create  connector successful");
        } catch (DatahubClientException e) {
            System.out.println(e.getErrorMessage());
        }
    }
  • The following sample code provides an example on how to create a DataConnector to synchronize data from DataHub to Tablestore:

       public static void createOtsConnector(String projectName,String topicName) {
        List<String> columnFields = Arrays.asList("field1", "field2");
        final SinkOtsConfig config = new SinkOtsConfig() {{
            setAccessId(Constant.ots_accessId);
            setAccessKey(Constant.ots_accessKey);
            setEndpoint(Constant.ots_endpoint);
            setInstance(Constant.ots_instance);
            setTable(Constant.ots_table);
            setAuthMode(AuthMode.AK);
        }};

        try {
            // Create a DataConnector.
            datahubClient.createConnector(projectName, topicName, ConnectorType.SINK_OTS, columnFields, config);
            System.out.println("create  connector successful");
        }  catch (DatahubClientException e) {
            System.out.println(e.getErrorMessage());    
        }
    }
  • The following sample code provides an example on how to create a DataConnector to synchronize data from DataHub to Hologres:

       public static void createHoloConnector(String projectName,String topicName) {
        List<String> columnFields = Arrays.asList("field1", "field2");
        final SinkHologresConfig config = new SinkHologresConfig() {{
            setAccessId(Constant.accessId);
            setAccessKey(Constant.accessKey);
            setProjectName(Constant.projectName);
            setTopicName(Constant.topicName);
            setAuthMode(AuthMode.AK);
            setInstanceId(Constant.instanceId);
            // Set the timestamp unit.
            setTimestampUnit(TimestampUnit.MILLISECOND);
        }};

        try {
            // Create a DataConnector.
            datahubClient.createConnector(projectName, topicName, ConnectorType.SINK_HOLOGRES, columnFields, config);
            System.out.println("create  connector successful");
        }  catch (DatahubClientException e) {
            System.out.println(e.getErrorMessage());    
        }
    }
  • The following sample code provides an example on how to create a DataConnector to synchronize data from DataHub to Elasticsearch:

    public static void createEsConnector(String projectName,String topicName){
        List<String> columnFields = Arrays.asList("field1", "field2");
        final SinkEsConfig config = new SinkEsConfig() {{
            setEndpoint(Constant.es_endpoint);
            setIdFields(Constant.es_fields);
            setIndex(Constant.es_index);
            setPassword(Constant.es_password);
            setProxyMode(Constant.es_proxyMode);
            setTypeFields(Constant.es_typeFields);
            setUser(Constant.es_user);

        }};

        try {
            // Create a DataConnector.
            datahubClient.createConnector(projectName, topicName, ConnectorType.SINK_ES, columnFields, config);
            System.out.println("create  connector successful");
        } catch (DatahubClientException e) {
            System.out.println(e.getErrorMessage());
        }
    }
  • The following sample code provides an example on how to create a DataConnector to synchronize data from DataHub to Function Compute:

   public static void createFcConnector(String projectName,String topicName){
        List<String> columnFields = Arrays.asList("field1", "field2");
        final SinkFcConfig config = new SinkFcConfig() {{
            setEndpoint(Constant.fc_endpoint);
            setAccessId(Constant.fc_accessId);
            setAccessKey(Constant.fc_accessKey);
            setAuthMode(AuthMode.AK);
            setFunction(Constant.fc_function);
            setService(Constant.fc_service);

        }};

        try {
            // Create a DataConnector.
            datahubClient.createConnector(projectName, topicName, ConnectorType.SINK_FC, columnFields, config);
            System.out.println("create  connector successful");
        } catch (DatahubClientException e) {
            System.out.println(e.getErrorMessage());

        }
    }
  • The following sample code provides an example on how to create a DataConnector to synchronize data from DataHub to a MySQL database:

    public static void createMysqlConnector(String projectName,String topicName){
        List<String> columnFields = Arrays.asList("field1", "field2");

        final SinkMysqlConfig config = new SinkMysqlConfig() {{
         setDatabase( Constant.mysql_database);
         setHost(Constant.mysql_host);
         setInsertMode(InsertMode.OVERWRITE);
         setPassword(Constant.mysql_password);
         setPort(Constant.mysql_port);
         setTable(Constant.mysql_table);
         setUser(Constant.mysql_user);
        }};

        try {
            // Create a DataConnector.
            datahubClient.createConnector(projectName, topicName, ConnectorType.SINK_MYSQL, columnFields, config);
            System.out.println("create  connector successful");
        } catch (DatahubClientException e) {
            System.out.println(e.getErrorMessage());  
        }
    }

Delete a DataConnector

Note

Syntax: DeleteConnectorResult deleteConnector(String projectName, String topicName, ConnectorType connectorType)

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • ConnectorType: the type of DataConnector that you want to delete.

    • columnFields: the fields that you want to synchronize.

    • sinkStartTime: the time at which data starts to be synchronized to DataHub. Unit: milliseconds.

    • config: the configuration details of the specific type of DataConnector.

  • Errors

    • DatahubClientException

    • InvalidParameterException

    • AuthorizationFailureException

    • ResourceNotFoundException

  • Sample code

public static void deleteConnector(String projectName,String topicName) {
  try {
      datahubClient.deleteConnector(projectName, topicName, ConnectorType.SINK_ODPS);
      System.out.println("delete  connector successful");
  } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());    
  }
}

Query a DataConnector

Note

Syntax: GetConnectorResult getConnectorResult = datahubClient.getConnector(projectName,topicName,ConnectorType.SINK_ODPS)

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • ConnectorType: the type of DataConnector that you want to query.

  • Errors

    • DatahubClientException

    • InvalidParameterException

    • AuthorizationFailureException

    • ResourceNotFoundException

  • Sample code

public static void getConnector(String projectName,String topicName) {
  try {
      GetConnectorResult getConnectorResult = datahubClient.getConnector(projectName, topicName, ConnectorType.SINK_ODPS);
      System.out.println(getConnectorResult.getState() + "\t" + getConnectorResult.getSubId());
      for (String fieldName : getConnectorResult.getColumnFields()) {
          System.out.println(fieldName);
      }
  } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());

  }
}

Update a DataConnector

You can update the configurations of a DataConnector.

Note

Syntax: UpdateConnectorResult updateConnector(String projectName, String topicName, ConnectorType connectorType, SinkConfig config)

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • ConnectorType: the type of DataConnector that you want to update.

    • config: the configuration details of the specific type of DataConnector.

  • Errors

    • DatahubClientException

    • InvalidParameterException

    • AuthorizationFailureException

    • ResourceNotFoundException

  • Sample code

public static void updateConnector(String projectName,String topicName) {
    SinkOdpsConfig config = (SinkOdpsConfig) datahubClient.getConnector(projectName, topicName, ConnectorType.SINK_ODPS).getConfig();
    // Change the AccessKey pair.
    config.setTimeRange(100);
    config.setAccessId(accessId);
    config.setAccessKey(accessKey);
    // Modify the timestamp type.
    config.setTimestampUnit(ConnectorConfig.TimestampUnit.MICROSECOND);
    try {
        datahubClient.updateConnector(projectName, topicName, ConnectorType.SINK_ODPS, config);
        System.out.println("update  connector successful");
    }  catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());
    }
}

Update the fields to be synchronized by using a DataConnector

Note

Syntax: UpdateConnectorResult updateConnector(String projectName, String topicName, String connectorId, List columnFields)

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • connectorId: the ID of the DataConnector that you want to update.

    • columnFields: the fields that you want to synchronize.

  • Errors

    • DatahubClientException

    • InvalidParameterException

    • AuthorizationFailureException

    • ResourceNotFoundException

  • Sample code

    public static void updateConnector(String projectName,String topicName) {
        String connectorId = "";
        // The columnField parameter specifies all the fields to be synchronized to the downstream, which include but are not limited to the newly added fields.
        List<String> columnField = new ArrayList<>();
        columnField.add("f1");
        try {
            batchClient.updateConnector(projectName, topicName,connectorId,columnField);
            System.out.println("update  connector successful");
        }  catch (DatahubClientException e) {
            System.out.println(e.getErrorMessage());
        }
    }

Update the status of a DataConnector

Note

Syntax: UpdateConnectorStateResult updateConnectorState(String projectName, String topicName, ConnectorType connectorType, ConnectorState connectorState)

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • ConnectorType: the type of the DataConnector.

    • connectorState: the state of the DataConnector. Valid values: STOPPED and RUNNING.

  • Errors

    • DatahubClientException

    • InvalidParameterException

    • AuthorizationFailureException

    • ResourceNotFoundException

  • Sample code

public static void updateConnectorState(String projectName,String topicName) {
      try {
          datahubClient.updateConnectorState(projectName, topicName, ConnectorType.SINK_ODPS, ConnectorState.STOPPED);
          datahubClient.updateConnectorState(projectName, topicName, ConnectorType.SINK_ODPS, ConnectorState.RUNNING);
          System.out.println("update  connector state successful");
      } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());
      }
  }

Update the offset of a DataConnector

Note

Syntax: UpdateConnectorOffsetResult updateConnectorOffset(String projectName, String topicName, ConnectorType connectorType, String shardId, ConnectorOffset offset)

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • ConnectorType: the type of the DataConnector.

    • shardId: the ID of the shard. If the shardID parameter is set to null, the offsets of all shards are updated.

    • offset: the offset of the DataConnector.

  • Errors

    • DatahubClientException

    • InvalidParameterException

    • AuthorizationFailureException

    • ResourceNotFoundException

  • Sample code

public static void updateConnectorOffset(String projectName,String topicName) {
    ConnectorOffset offset = new ConnectorOffset() {{
        setSequence(10);
        setTimestamp(1000);
    }};
    try {
        // Before you update the offset of a DataConnector, stop the DataConnector.
        datahubClient.updateConnectorState(projectName, topicName, ConnectorType.SINK_ODPS, ConnectorState.STOPPED);
        datahubClient.updateConnectorOffset(projectName, topicName, ConnectorType.SINK_ODPS, shardId, offset);
        datahubClient.updateConnectorState(projectName, topicName, ConnectorType.SINK_ODPS, ConnectorState.RUNNING);
        System.out.println("update  connector offset successful");
    }  catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());
    }
}

List DataConnectors

Note

Syntax: ListConnectorResult listConnector(String projectName, String topicName)

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

  • Errors

    • DatahubClientException

    • InvalidParameterException

    • AuthorizationFailureException

    • ResourceNotFoundException

  • Sample code

 public static void listConnector(String projectName,String topicName) {
  try {
       ListConnectorResult listConnectorResult = datahubClient.listConnector(projectName, topicName);
      for (String cName : listConnectorResult.getConnectorNames()) {
          System.out.println(cName);
      }
  } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());
   }
 }

Query the shard status of a DataConnector

Note

Syntax: GetConnectorShardStatusResult getConnectorShardStatus(String projectName, String topicName, ConnectorType connectorType), or ConnectorShardStatusEntry getConnectorShardStatus(String projectName, String topicName, ConnectorType connectorType, String shardId)

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • ConnectorType: the type of the DataConnector.

    • shardId: the ID of the shard.

  • Errors

    • DatahubClientException

    • InvalidParameterException

    • AuthorizationFailureException

    • ResourceNotFoundException

  • Sample code

public static void getConnectorShardStatusByShard(String projectName,String topicName,String shardId) {
    try {
        ConnectorShardStatusEntry connectorShardStatusEntry = datahubClient.getConnectorShardStatus(projectName, topicName, ConnectorType.SINK_ODPS, shardId);
        System.out.println(connectorShardStatusEntry.getState() + "\t"
                + connectorShardStatusEntry.getCurrSequence() + "\t"
                + connectorShardStatusEntry.getDiscardCount() + "\t"
                + connectorShardStatusEntry.getUpdateTime());
    } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());
    }
}
public static void getConnectorShardStatus(String projectName,String topicName) {
    try {
        GetConnectorShardStatusResult getConnectorShardStatusResult = datahubClient.getConnectorShardStatus(projectName, topicName, ConnectorType.SINK_ODPS);
        for (Map.Entry<String, ConnectorShardStatusEntry> entry : getConnectorShardStatusResult.getStatusEntryMap().entrySet()) {
            System.out.println(entry.getKey() + " : " + entry.getValue().getState() + "\t"
                    + entry.getValue().getCurrSequence() + "\t"
                    + entry.getValue().getDiscardCount() + "\t"
                    + entry.getValue().getUpdateTime());
        }
    } catch (DatahubClientException e) {
            System.out.println(e.getErrorMessage());
    }
}

Restart a DataConnector

Note

Syntax: ReloadConnectorResult reloadConnector(String projectName, String topicName, ConnectorType connectorType), or ReloadConnectorResult reloadConnector(String projectName, String topicName, ConnectorType connectorType, String shardId)

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • ConnectorType: the type of the DataConnector.

    • shardId: the ID of the shard.

  • Errors

    • DatahubClientException

    • InvalidParameterException

    • AuthorizationFailureException

    • ResourceNotFoundException

  • Sample code

public static void reloadConnector(String projectName,String topicName ) {
    try {
        datahubClient.reloadConnector(projectName, topicName, ConnectorType.SINK_ODPS);
        System.out.println("reload connector successful");
    } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());      
    }
}
public static void reloadConnectorByShard(String projectName,String topicName,String shardId) {
    try {
        datahubClient.reloadConnector(projectName, topicName, ConnectorType.SINK_ODPS, shardId);
        System.out.println("reload connector successful");
    } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());

    }
}

Query the completion time of a DataConnector

Note

Syntax: GetConnectorDoneTimeResult getConnectorDoneTime(String projectName, String topicName, ConnectorType connectorType)

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • ConnectorType: the type of the DataConnector.

  • Errors

    • DatahubClientException

    • InvalidParameterException

    • AuthorizationFailureException

    • ResourceNotFoundException

  • Sample code

public static void getDoneTime(String projectName,String topicName ) {
    try {
        GetConnectorDoneTimeResult getConnectorDoneTimeResult = datahubClient.getConnectorDoneTime(projectName, topicName, ConnectorType.SINK_ODPS);
        System.out.println(getConnectorDoneTimeResult.getDoneTime());
    } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());

    }
}

Update the VPC whitelist

Note

Syntax: UpdateProjectVpcWhitelistResult updateProjectVpcWhitelist(String projectName, String vpcIds)

  • Parameters

    • projectName: the name of the project.

    • vpcids: the IDs of the virtual private clouds (VPCs).

  • Errors

    • DatahubClientException

    • InvalidParameterException

    • AuthorizationFailureException

    • ResourceNotFoundException

  • Sample code

public static void updateProjectVpcWhitelist(String projectName) {
    String vpcid = "12345";
    try {
        datahubClient.updateProjectVpcWhitelist(projectName, vpcid);
    } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());
    }
}

Add a field

Note

Syntax: AppendConnectorFieldResult appendConnectorField(String projectName, String topicName, ConnectorType connectorType, String fieldName)

You can add a field to be synchronized by using a DataConnector, provided that a MaxCompute table contains the field.

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • ConnectorType The type of DataConnector.

    • fieldName: the name of the field to be added. The field can be set to null.

  • Errors

    • DatahubClientException

    • InvalidParameterException

    • AuthorizationFailureException

    • ResourceNotFoundException

  • Sample code

public static void appendConnectorField(String projectName,String topicName) {
    String newField = "newfield";
    try {
        // Both the topic and the MaxCompute table contain the field to be added. In addition, the schema for the topic is the same as that of the MaxCompute table.
        datahubClient.appendConnectorField(projectName, topicName, ConnectorType.SINK_ODPS, newField);
    } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());

    }
}

Manage multiple objects at a time

We recommend that you use the command-line tool in the DataHub console.