All Products
Search
Document Center

Create a subscription

Last Updated: Aug 25, 2021

Overview of the subscription feature

Resumable consumption is required in scenarios where you consume data in DataHub topics and want to resume the consumption from the time when your application fails. If you need to resume consumption, you must save the current consumption offset and make sure that the service used to save consumption offsets supports high availability. This increases the complexity of application development. The subscription feature of DataHub allows you to save consumption offsets to the server to resolve the preceding problem. You need only to enable this feature and add a few lines of code to your application to obtain a consumption offset maintenance service with high availability. In addition, the subscription feature allows you to reset consumption offsets. This ensures that the data can be consumed at least once. For example, an error occurs when your application processes the data consumed in a specific time period and you want to consume the data again. In this case, you can reset the consumption offset without restarting the application. Your application automatically consumes data from the specified consumption offset.

Create a subscription

Make sure that your account has the permissions to subscribe to topics in a specific project. For more information, see Access control. Perform the following steps to create a subscription:

  • Go to the details page of a topic. Click Subscription in the upper-right corner.

8

  • In the Create Subscription panel, set the parameters as required. Then, click Create.

    • Application: the name of the application for which you want to create the subscription.

    • Description: the description of the subscription.

    9

On the Subscription List tab, find the created subscription and click the search icon in the Consumer Offset column to view the consumption status of all shards.

10

2. Example

The subscription feature allows you to save consumption offsets. You can use the read and write capabilities of DataHub with the capability of saving consumption offsets in scenarios where you must save consumption offsets after data is read. For more information about the read and write capabilities of DataHub, see DataHub SDK for Java.

  • Sample code

// The following sample code provides an example on how to consume data from a saved consumption offset and submit consumption offsets during consumption.
public void offset_consumption(int maxRetry) {
    String endpoint = "<YourEndPoint>";
    String accessId = "<YourAccessId>";
    String accessKey = "<YourAccessKey>";
    String projectName = "<YourProjectName>";
    String topicName = "<YourTopicName>";
    String subId = "<YourSubId>";
    String shardId = "0";
    List<String> shardIds = Arrays.asList(shardId);
    // Create a DataHub client.
    DatahubClient datahubClient = DatahubClientBuilder.newBuilder()
            .setDatahubConfig(
                    new DatahubConfig(endpoint,
                            // Specify whether to enable binary data transmission. The server of V2.12 or later supports binary data transmission.
                            new AliyunAccount(accessId, accessKey), true))
            .build();
    RecordSchema schema = datahubClient.getTopic(projectName, topicName).getRecordSchema();
    OpenSubscriptionSessionResult openSubscriptionSessionResult = datahubClient.openSubscriptionSession(projectName, topicName, subId, shardIds);
    SubscriptionOffset subscriptionOffset = openSubscriptionSessionResult.getOffsets().get(shardId);
    // 1. Obtain the cursor of the record at the current consumption offset. If the record expired or the record is not consumed, obtain the cursor of the first record within the time-to-live (TTL) period of the topic.
    String cursor = "";
    // If the sequence number is smaller than 0, the record is not consumed.
    if (subscriptionOffset.getSequence() < 0) {
        // Obtain the cursor of the first record within the TTL period of the topic.
        cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.OLDEST).getCursor();
    } else {
        // Obtain the cursor of the next record.
        long nextSequence = subscriptionOffset.getSequence() + 1;
        try {
            // If the SeekOutOfRange error is returned after you obtain the cursor based on the sequence number, the record expired.
            cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.SEQUENCE, nextSequence).getCursor();
        } catch (SeekOutOfRangeException e) {
            // Obtain the cursor of the first record within the TTL period of the topic.
            cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.OLDEST).getCursor();
        }
    }
    // 2. Read records and save consumption offsets. In this example, you read records of the TUPLE type and save consumption offsets each time 1,000 records are read.
    long recordCount = 0L;
    // Read 1,000 records each time.
    int fetchNum = 1000;
    int retryNum = 0;
    int commitNum = 1000;
    while (retryNum < maxRetry) {
        try {
            GetRecordsResult getRecordsResult = datahubClient.getRecords(projectName, topicName, shardId, schema, cursor, fetchNum);
            if (getRecordsResult.getRecordCount() <= 0) {
                // If no records can be read, pause the thread for 1 second and continue to read records.
                System.out.println("no data, sleep 1 second");
                Thread.sleep(1000);
                continue;
            }
            for (RecordEntry recordEntry : getRecordsResult.getRecords()) {
                // Consume data.
                TupleRecordData data = (TupleRecordData) recordEntry.getRecordData();
                System.out.println("field1:" + data.getField("field1") + "\t"
                        + "field2:" + data.getField("field2"));
                // Save the consumption offset after the data is consumed.
                recordCount++;
                subscriptionOffset.setSequence(recordEntry.getSequence());
                subscriptionOffset.setTimestamp(recordEntry.getSystemTime());
                // commit offset every 1000 records
                if (recordCount % commitNum == 0) {
                    // Submit the consumption offset.
                    Map<String, SubscriptionOffset> offsetMap = new HashMap<>();
                    offsetMap.put(shardId, subscriptionOffset);
                    datahubClient.commitSubscriptionOffset(projectName, topicName, subId, offsetMap);
                    System.out.println("commit offset successful");
                }
            }
            cursor = getRecordsResult.getNextCursor();
        } catch (SubscriptionOfflineException | SubscriptionSessionInvalidException e) {
            // The subscription session is exited. Offline: The subscription is offline; SessionChange: The subscription is being consumed by other clients.
            e.printStackTrace();
            throw e;
        } catch (SubscriptionOffsetResetException e) {
            // The consumption offset is reset. You must obtain the version information of the consumption offset again.
            SubscriptionOffset offset = datahubClient.getSubscriptionOffset(projectName, topicName, subId, shardIds).getOffsets().get(shardId);
            subscriptionOffset.setVersionId(offset.getVersionId());
            // After the consumption offset is reset, you must obtain the cursor of the record at the consumption offset again. The method used to obtain the cursor depends on the method used to reset the consumption offset. 
            // If both the sequence number and timestamp are specified to reset the consumption offset, you can obtain the cursor based on the sequence number or the timestamp.
            // If only the sequence number is specified to reset the consumption offset, you can obtain the cursor based only on the sequence number. 
            // If only the timestamp is specified to reset the consumption offset, you can obtain the cursor based only on the timestamp.
            // In most cases, preferentially obtain the cursor based on the sequence number. If the cursor failed to be obtained based on the sequence number or the timestamp, obtain the cursor of the earliest record.
            cursor = null;
            if (cursor == null) {
                try {
                    long nextSequence = offset.getSequence() + 1;
                    cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.SEQUENCE, nextSequence).getCursor();
                    System.out.println("get cursor successful");
                } catch (DatahubClientException exception) {
                    System.out.println("get cursor by SEQUENCE failed, try to get cursor by SYSTEM_TIME");
                }
            }
            if (cursor == null) {
                try {
                    cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.SYSTEM_TIME, offset.getTimestamp()).getCursor();
                    System.out.println("get cursor successful");
                } catch (DatahubClientException exception) {
                    System.out.println("get cursor by SYSTEM_TIME failed, try to get cursor by OLDEST");
                }
            }
            if (cursor == null) {
                try {
                    cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.OLDEST).getCursor();
                    System.out.println("get cursor successful");
                } catch (DatahubClientException exception) {
                    System.out.println("get cursor by OLDEST failed");
                    System.out.println("get cursor failed!!");
                    throw e;
                }
            }
        } catch (LimitExceededException e) {
            // limit exceed, retry
            e.printStackTrace();
            retryNum++;
        } catch (DatahubClientException e) {
            // other error, retry
            e.printStackTrace();
            retryNum++;
        } catch (Exception e) {
            e.printStackTrace();
            System.exit(-1);
        }
    }
}
  • Result

1. When you start your application for the first time, the application consumes data from the earliest record. When the application is running, you can refresh the Subscription List tab in the DataHub console. The consumption offsets of the shards move forward. 2. If you reset the consumption offset by clicking Reset in the DataHub console during the consumption, your application automatically detects the change of the consumption offset and consumes data from the specified consumption offset. When the application catches OffsetResetedException, the application calls the getSubscriptionOffset method to query the latest consumption offset from the server. Then, the application can consume data from the latest consumption offset. 3. Note that a shard in a subscription cannot be consumed by multiple threads or processes at the same time. Otherwise, the consumption offset submitted by a thread is overwritten by that submitted by another thread, and the server cannot determine to which thread the saved consumption offset belongs. In this case, the server throws OffsetSessionChangedException. We recommend that you exit the subscription session to check whether data is repeatedly consumed if this exception is caught.