All Products
Search
Document Center

Data read and write

Last Updated: Aug 06, 2021

Data read and write

  • You can call DataHub SDK to read data from a shard in the CLOSED or ACTIVE state and write data to a shard in the ACTIVE state.

  • You can also import the datahub-client-library dependency, which encapsulates data read and write operations in DataHub SDK for Java. You can use a producer to implement even data writes to shards or use a consumer to implement collaborative consumption. We recommend that you implement collaborative consumption.

Read data from DataHub

You can read data from DataHub in the following ways:

  1. Call DataHub SDK.

  2. Use the collaborative consumption feature.

    Call DataHub SDK

    Step 1: Specify the cursor position

    To read data from a topic, you must specify the shard where the data resides. You also need to specify the cursor position for the data read. You can specify the cursor position in the following ways: OLDEST, LATEST, SEQUENCE, and SYSTEM_TIME.

  • OLDEST: the cursor that points to the earliest data record among valid data.

  • LATEST: the cursor that points to the latest data record.

  • SEQUENCE: the cursor that points to the data record with the specified sequence number.

  • SYSTEM_TIME: the cursor that points to the first data record ingested after the specified timestamp.

    Note

    GetCursorResult getCursor(String projectName, String topicName, String shardId, CursorType type);GetCursorResult getCursor(String projectName, String topicName, String shardId, CursorType type, long param);

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • shardId: the ID of the shard.

    • CursorType: the position of the cursor.

  • Exceptions

    • DatahubClientException

    • InvalidParameterException

    • AuthorizationFailureException

    • ResourceNotFoundException

    • SeekOutOfRangeException

  • Sample code

    public static void getcursor() {
      String shardId = "5";
      try {
          /* OLDEST usage example */
          String oldestCursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.OLDEST).getCursor();
          /* LATEST usage example */
          String latestCursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.LATEST).getCursor();
          /* SEQUENCE usage example */
          // Obtain the sequence number of the latest data record.
          long seq = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.LATEST).getSequence();
          // Obtain the read positions of the latest ten data records.
          String seqCursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.SEQUENCE, seq - 9).getCursor();
          /* SYSTEM_TIME usage example */
          // Convert the time into a timestamp.
          String time = "2019-07-01 10:00:00";
          SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
          long timestamp = 0L;
          try {
              Date date = simpleDateFormat.parse(time);
              timestamp = date.getTime();// Obtain the timestamp of the time.
              //System.out.println(timestamp);
          } catch (ParseException e) {
              System.exit(-1);
          }
          // Obtain the read position after the timestamp.
          String timeCursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.SYSTEM_TIME, timestamp).getCursor();
          System.out.println("get cursor successful");
      } catch (InvalidParameterException e) {
          System.out.println("invalid parameter, please check your parameter");
          System.exit(1);
      } catch (AuthorizationFailureException e) {
          System.out.println("AK error, please check your accessId and accessKey");
          System.exit(1);
      } catch (ResourceNotFoundException e) {
          System.out.println("project or topic or shard not found");
          System.exit(1);
      } catch (SeekOutOfRangeException e) {
          System.out.println("offset invalid or has expired");
          System.exit(1);
      } catch (DatahubClientException e) {
          System.out.println("other error");
          System.out.println(e);
          System.exit(1);
      }
    }

Step 2: Call a method to read data

Note

GetRecordsResult getRecords(String projectName, String topicName, String shardId, String cursor, int limit);GetRecordsResult getRecords(String projectName, String topicName, String shardId, RecordSchema schema, String cursor, int limit);

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • shardId: the ID of the shard.

    • schema: the schema of data. This parameter is required if you read data from a TUPLE topic.

    • cursor: the position of the cursor for data reading.

    • limit: the maximum number of data records to read.

  • Exceptions

    • DatahubClientException

    • InvalidParameterException

    • AuthorizationFailureException

    • ResourceNotFoundException

    • ShardSealedException

    • LimitExceededException

  • Sample code

1). Read data from a TUPLE topic

public static void example() {
     // The maximum number of data records to be read at a time.
     int recordLimit = 1000;
     String shardId = "7";
     // Specify the cursor position. In this example, the cursor that points to the earliest data record among valid data is specified.
     // Note: In general, you must call the getCursor method only for the first data read operation. For subsequent data read operations, call the getRecords method to specify the cursor position.
     String cursor = "";
     try {
         cursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.OLDEST).getCursor();
     } catch (InvalidParameterException e) {
         System.out.println("invalid parameter, please check your parameter");
         System.exit(1);
     } catch (AuthorizationFailureException e) {
         System.out.println("AK error, please check your accessId and accessKey");
         System.exit(1);
     } catch (ResourceNotFoundException e) {
         System.out.println("project or topic or shard not found");
         System.exit(1);
     } catch (SeekOutOfRangeException e) {
         System.out.println("offset invalid or has expired");
         System.exit(1);
     } catch (DatahubClientException e) {
         System.out.println("other error");
         System.out.println(e);
         System.exit(1);
     }
     while (true) {
         try {
             GetRecordsResult result = datahubClient.getRecords(Constant.projectName, Constant.topicName, shardId, recordSchema, cursor, recordLimit);
             if (result.getRecordCount() <= 0) {
                 // If no records are read, pause the thread for one second and then continue to read records.
                 Thread.sleep(10000);
                 continue;
             }
             for (RecordEntry entry : result.getRecords()) {
                 TupleRecordData data = (TupleRecordData) entry.getRecordData();
                 System.out.println("field1:" + data.getField("field1") + "\t"
                         + "field2:" + data.getField("field2"));
             }
             // Specify the cursor position that points to the next data record to be read.
             cursor = result.getNextCursor();
         } catch (InvalidCursorException ex) {
             // The cursor position is invalid or has expired. Specify another cursor position as the beginning of consumption.
             cursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.OLDEST).getCursor();
         } catch (SeekOutOfRangeException e) {
             System.out.println("offset invalid");
             System.exit(1);
         } catch (ResourceNotFoundException e) {
             System.out.println("project or topic or shard not found");
             System.exit(1);
         } catch (ShardSealedException e) {
             System.out.println("shard is closed, all data has been read");
             System.exit(1);
         } catch (LimitExceededException e) {
             System.out.println("maybe exceed limit, retry");
         } catch (DatahubClientException e) {
             System.out.println("other error");
             System.out.println(e);
             System.exit(1);
         } catch (InterruptedException e) {
             e.printStackTrace();
         }
     }
 }

2). Read data from a BLOB topic

public static void example() {
    // The maximum number of data records to be read at a time.
    int recordLimit = 1000;
    String shardId = "7";
    // Specify the cursor position. In this example, the cursor that points to the earliest data record among valid data is specified.
    // Note: In general, you must call the getCursor method only for the first data read operation. For subsequent data read operations, call the getRecords method to specify the cursor position.
    String cursor = "";
    try {
        cursor = datahubClient.getCursor(Constant.projectName, Constant.blobTopicName, shardId, CursorType.OLDEST).getCursor();
    } catch (InvalidParameterException e) {
        System.out.println("invalid parameter, please check your parameter");
        System.exit(1);
    } catch (AuthorizationFailureException e) {
        System.out.println("AK error, please check your accessId and accessKey");
        System.exit(1);
    } catch (ResourceNotFoundException e) {
        System.out.println("project or topic or shard not found");
        System.exit(1);
    } catch (SeekOutOfRangeException e) {
        System.out.println("offset invalid or has expired");
        System.exit(1);
    } catch (DatahubClientException e) {
        System.out.println("other error");
        System.out.println(e);
        System.exit(1);
    }
    while (true) {
        try {
            GetRecordsResult result = datahubClient.getRecords(Constant.projectName, Constant.blobTopicName, shardId, recordSchema, cursor, recordLimit);
            if (result.getRecordCount() <= 0) {
                // If no records are read, pause the thread for one second and then continue to read records.
                Thread.sleep(10000);
                continue;
            }
            /* Consume data. */
            for (RecordEntry record: result.getRecords()){
                 BlobRecordData data = (BlobRecordData) record.getRecordData();
                 System.out.println(new String(data.getData()));
            }
            // Specify the cursor position that points to the next data record to be read.
            cursor = result.getNextCursor();
        } catch (InvalidCursorException ex) {
            // The cursor position is invalid or has expired. Specify another cursor position as the beginning of consumption.
            cursor = datahubClient.getCursor(Constant.projectName, Constant.blobTopicName, shardId, CursorType.OLDEST).getCursor();
        } catch (SeekOutOfRangeException e) {
            System.out.println("offset invalid");
            System.exit(1);
        } catch (ResourceNotFoundException e) {
            System.out.println("project or topic or shard not found");
            System.exit(1);
        } catch (ShardSealedException e) {
            System.out.println("shard is closed, all data has been read");
            System.exit(1);
        } catch (LimitExceededException e) {
            System.out.println("maybe exceed limit, retry");
        } catch (DatahubClientException e) {
            System.out.println("other error");
            System.out.println(e);
            System.exit(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Use the collaborative consumption feature

Step 1: Initialize a consumer

Configurations

Parameter

Description

autoCommit

Specifies whether to automatically submit consumption offsets. Default value: true. Consumption offsets are automatically submitted at the specified time interval in background threads. After a data read method is called, the consumption offset is automatically submitted and the processing of the read data is considered to be complete. If you set this parameter to false, each data record must be acknowledged after the processing is complete. Offset submission in the background ensures that all the data records before the offset are acknowledged.

offsetCommitTimeoutMs

The time interval at which consumption offsets are submitted. Unit: milliseconds. Valid values: [3000,300000]. Default value: 30000.

sessionTimeoutMs

The session timeout period. The heartbeat interval is two-thirds of this value. If no heartbeat message is received from a client within the heartbeat interval, the client is considered to be stopped. The server reallocates the occupied shards. Unit: milliseconds. Valid values: [60000,180000]. Default value: 60000.

fetchSize

The size of data records that can be read from a single shard during asynchronous data reads. The maximum size of cached data records is twice the value. If the actual size of cached data records is less than twice the value, an asynchronous data read is triggered. The value of this parameter must be greater than 0. Default value: 1000.

{
        // In this example, the endpoint of the China (Hangzhou) region is used. Use the endpoint of your actual region.
        String endpoint = "http://dh-cn-hangzhou.aliyuncs.com";
        String accessId = "<YourAccessKeyId>";
        String accessKey = "<YourAccessKeySecret>";
        String projectName = "<YourProjectName>";
        String topicName = "<YourTopicName>";
        String subId = "<YourSubscriptionId>";
        /**
         * 1.Use the collaborative consumption and offset-based data consumption features.
         * Use the collaborative consumption feature:
             If two machines use the same subscription ID to consume data in a topic that contains five shards, you do not need to allocate the shards to the machines.  
            The shards are automatically allocated by the server. If a third machine is added, the server reallocates the shards.
         *
         * Use the offset-based data consumption feature:
             In offset-based data consumption, data is read based on the offset that corresponds to the subscription ID. If the subscription is newly created and does not have an offset, data is read from the start. If you want to read data from a specific point in time, reset the offset of the subscription ID in the DataHub console.
         * 
         * */
        ConsumerConfig config = new ConsumerConfig(endpoint, accessId, accessKey);
        Consumer consumer = new Consumer(projectName, topicName, subId, config);



        /**
         * 2. If you want to use the offset-based data consumption feature instead of the collaborative consumption feature, specify the subscription ID and the shards to be read by the consumer.
         * Do not use the collaborative consumption feature:
             If two machines use the same subscription ID to consume data in a topic that contains five shards, you must allocate the shards to the machines. 
            For example, you can allocate the shards 0, 1, and 2 to Client A and shards 3 and 4 to Client B. If a third machine is added, you must reallocate the shards.
         * 
         * */

        // Client A consumes the data in shards 0, 1, and 2.
        List<String> shardlists = Arrays.asList("0", "1", "2");
        ConsumerConfig config = new ConsumerConfig(endpoint, accessId, accessKey);
        Consumer consumer = new Consumer(projectName, topicName, subId, shardlists, config);


        // Client B consumes the data in shards 3 and 4.
//        List<String> shardlists = Arrays.asList("3", "4");
//        ConsumerConfig config = new ConsumerConfig(endpoint, accessId, accessKey);
//        Consumer consumer = new Consumer(projectName, topicName, subId, shardlists, config);



        /**
         *3.Do not use the collaborative consumption or offset-based data consumption feature.
         * Do not use the offset-based data consumption feature:
             If you do not use the offset-based data consumption feature, you must use a storage service such as a database to store the timestamp or sequence number of each consumed data record. You must read data based on the recorded information every time.
         * 
         * */
        Map<String, Offset> offsetMap = new HashMap<>();
// If both the sequence number and timestamp are specified but the sequence number is invalid, specify the cursor position based on the timestamp.
        offsetMap.put("0", new Offset(100, 1548573440756L));
// If only the sequence number is specified, specify the cursor position based on the sequence number.
        offsetMap.put("1", new Offset().setSequence(1));
// If only the timestamp is specified, specify the cursor position based on the timestamp.
        offsetMap.put("2", new Offset().setTimestamp(1548573440756L));
        ConsumerConfig config = new ConsumerConfig(endpoint, accessId, accessKey);
        Consumer consumer = new Consumer(projectName, topicName, subId, offsetMap, config);
    }

Step 2: Implement collaborative consumption

import com.aliyun.datahub.client.exception.AuthorizationFailureException;
import com.aliyun.datahub.client.exception.DatahubClientException;
import com.aliyun.datahub.client.exception.InvalidParameterException;
import com.aliyun.datahub.client.exception.NoPermissionException;
import com.aliyun.datahub.client.exception.SubscriptionOfflineException;
import com.aliyun.datahub.client.exception.SubscriptionOffsetResetException;
import com.aliyun.datahub.client.exception.SubscriptionSessionInvalidException;
import com.aliyun.datahub.client.model.RecordEntry;
import com.aliyun.datahub.client.model.TupleRecordData;
import com.aliyun.datahub.clientlibrary.config.ConsumerConfig;
import com.aliyun.datahub.clientlibrary.consumer.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.TimeUnit;

public class DatahubReader {
    private static final Logger LOG = LoggerFactory.getLogger(DatahubReader.class);

    private static void sleep(long milliSeconds) {
        try {
            TimeUnit.MILLISECONDS.sleep(milliSeconds);
        } catch (InterruptedException e) {
            // TODO: handle exceptions.
        }
    }

    public static Consumer createConsumer(ConsumerConfig config, String project, String topic, String subId)
    {
        return new Consumer(project, topic, subId, config);
    }

    public static void main(String[] args) {
        String endpoint = "http://dh-cn-hangzhou.aliyuncs.com";
        String accessId = "<YourAccessKeyId>";
        String accessKey = "<YourAccessKeySecret>";
        String projectName = "<YourProjectName>";
        String topicName = "<YourTopicName>";
        String subId = "<YourSubscriptionId>";

        ConsumerConfig config = new ConsumerConfig(endpoint, accessId, accessKey);
        Consumer consumer = createConsumer(config, projectName, topicName, subId);

        int maxRetry = 3;
        boolean stop = false;
        try {
            while (!stop) {
                try {
                    while (true) {
                        // After the initialization of collaborative consumption, wait about 40 seconds for the server to allocate shards. During this period, return results for data read requests are NULL.
                        // Automatic submission of offsets is enabled. Each time when the data read method is called, the processing of the data that has been read is considered to be complete and automatically acknowledged.
                        RecordEntry record = consumer.read(maxRetry);

                        // Process data.
                        if (record != null) {
                            TupleRecordData data = (TupleRecordData) record.getRecordData();
                            // Process data based on your own schema. In this example, export the value of the first field.
                            LOG.info("field1: {}", data.getField(0));

                            // Retrieve data based on the field name.
                            // LOG.info("field2: {}", data.getField("field2"));

                            // Automatic submission of offsets is disabled. Each data record must be acknowledged after the processing is complete.
                            // Automatic submission of offsets is enabled. No acknowledge operations are performed.
                            // The version must be 1.1.7 or later.
                            record.getKey().ack();
                        } else {
                            LOG.info("read null");
                        }
                    }
                } catch (SubscriptionOffsetResetException e) {
                    // The offset is reset and the consumer is reinitialized.
                    try {
                        consumer.close();
                        consumer = createConsumer(config, projectName, topicName, subId);
                    } catch (DatahubClientException e1) {
                        // The initialization failed. Perform a retry or throw an exception.
                        LOG.error("create consumer failed", e);
                        throw e;
                    }
                } catch (InvalidParameterException |
                        SubscriptionOfflineException |
                        SubscriptionSessionInvalidException |
                        AuthorizationFailureException |
                        NoPermissionException e) {
                    // The request parameters are invalid.
                    // The subscription is offline.
                    // The subscribed shard is occupied by another client.
                    // The signature is invalid.
                    // No permissions.
                    LOG.error("read failed", e);
                    throw e;
                } catch (DatahubClientException e) {
                    // Handle base class exceptions such as network issues. For example, you can perform a retry.
                    LOG.error("read failed, retry", e);
                    sleep(1000);
                }
            }
        } catch (Throwable e) {
            LOG.error("read failed", e);
        } finally {
            // Make sure that resources are released as expected.
            // Submit acknowledged offsets.
            consumer.close();
        }
    }
}

Write data to DataHub

Call DataHub SDK

The putRecordsByShardResult method is supported by servers whose version is 2.12 or later. The earlier versions support only the putRecords method. When you call the putRecordsByShardResult method, specify a shard to which you want to write data. Otherwise, data is written to the first shard in the ACTIVE state by default. The value of the records request parameter in the preceding two methods is a list. Each element in the list represents a data record. All data records must be written to topics of the same type, either TUPLE or BLOB. DataHub allows you to write data in two ways. You can call the putRecordsByShard method to write data by shard. This requires the version of the server to be 2.12 or later. You can also call the putRecords method to write data. If you call the putRecords method to write data, you must call the PutRecordsResult method to verify whether the data write is successful. If you call the putRecordsByShard method to write data, DataHub throws an exception for failed data writes. If the version of your server is 2.12 or later, we recommend that you call the putRecordsByShard method to write data.

Note

PutRecordsResult putRecords(String projectName, String topicName, List records);PutRecordsByShardResult putRecordsByShard(String projectName, String topicName, String shardId, List records);

  • Parameters

    • projectName: the name of the project.

    • topicName: the name of the topic.

    • shardId: the ID of the shard.

    • records: the data records to write.

  • Exceptions

    • DatahubClientException

    • InvalidParameterException

    • AuthorizationFailureException

    • ResourceNotFoundException

    • ShardSealedException

    • LimitExceededException

1). Write data to a TUPLE topic

// Write data to a TUPLE topic.
public static void tupleExample() {
    String shardId = "9";
    // Obtain the schema.
    recordSchema = datahubClient.getTopic(Constant.projectName, Constant.topicName).getRecordSchema();
    // Generate ten data records.
    List<RecordEntry> recordEntries = new ArrayList<>();
    for (int i = 0; i < 10; ++i) {
        RecordEntry recordEntry = new RecordEntry();
        // Set additional attributes for each data record, such as the IP address and hostname. Additional attributes are optional. If you do not set additional attributes, data writes are not affected.
        recordEntry.addAttribute("key1", "value1");
        TupleRecordData data = new TupleRecordData(recordSchema);
        data.setField("field1", "HelloWorld");
        data.setField("field2", 1234567);
        recordEntry.setRecordData(data);
        recordEntry.setShardId(shardId);
        recordEntries.add(recordEntry);
    }
    try {
        // The putRecordsByShard method is supported only when the version of the server is 2.12 or later. For earlier versions, call the putRecords method.
        //datahubClient.putRecordsByShard(Constant.projectName, Constant.topicName, shardId, recordEntries);
        datahubClient.putRecords(Constant.projectName, Constant.topicName, recordEntries);
        System.out.println("write data successful");
    } catch (InvalidParameterException e) {
        System.out.println("invalid parameter, please check your parameter");
        System.exit(1);
    } catch (AuthorizationFailureException e) {
        System.out.println("AK error, please check your accessId and accessKey");
        System.exit(1);
    } catch (ResourceNotFoundException e) {
        System.out.println("project or topic or shard not found");
        System.exit(1);
    } catch (ShardSealedException e) {
        System.out.println("shard status is CLOSED, can not write");
        System.exit(1);
    } catch (DatahubClientException e) {
        System.out.println("other error");
        System.out.println(e);
        System.exit(1);
    }
}

2). Write data to a BLOB topic

// Write data to a BLOB topic.
public static void blobExample() {
    // Generate ten data records.
    List<RecordEntry> recordEntries = new ArrayList<>();
    String shardId = "4";
    for (int i = 0; i < 10; ++i) {
        RecordEntry recordEntry = new RecordEntry();
        // Set additional attributes for each data record.
        recordEntry.addAttribute("key1", "value1");
        BlobRecordData data = new BlobRecordData("123456".getBytes(Charsets.UTF_8));
        recordEntry.setRecordData(data);
        recordEntry.setShardId(shardId);
        recordEntries.add(recordEntry);
        recordEntry.setShardId("0");
    }
    while (true) {
        try {
            // The putRecordsByShard method is supported only when the version of the server is 2.12 or later. For earlier versions, call the putRecords method.
            //datahubClient.putRecordsByShard(Constant.projectName, Constant.topicName, shardId, recordEntries);
            datahubClient.putRecords(Constant.projectName, Constant.topicName, recordEntries);
            System.out.println("write data  successful");
            break;
        } catch (InvalidParameterException e) {
            System.out.println("invalid parameter, please check your parameter");
            System.exit(1);
        } catch (AuthorizationFailureException e) {
            System.out.println("AK error, please check your accessId and accessKey");
            System.exit(1);
        } catch (ResourceNotFoundException e) {
            System.out.println("project or topic or shard not found");
            System.exit(1);
        } catch (ShardSealedException e) {
            System.out.println("shard status is CLOSED, can not write");
            System.exit(1);
        } catch (LimitExceededException e) {
            System.out.println("maybe qps exceed limit, retry");
        } catch (DatahubClientException e) {
            System.out.println("other error");
            System.out.println(e);
            System.exit(1);
        }
    }
}

Use a producer

Step 1: Import dependencies

<dependency>
    <groupId>com.aliyun.datahub</groupId>
    <artifactId>aliyun-sdk-datahub</artifactId>
    <version>2.19.0-public</version>
</dependency>
<dependency>
      <groupId>com.aliyun.datahub</groupId>
      <artifactId>datahub-client-library</artifactId>
      <version>1.1.12-public</version>
</dependency>

Step 2: Write code

import com.aliyun.datahub.client.exception.AuthorizationFailureException;
import com.aliyun.datahub.client.exception.DatahubClientException;
import com.aliyun.datahub.client.exception.InvalidParameterException;
import com.aliyun.datahub.client.exception.MalformedRecordException;
import com.aliyun.datahub.client.exception.NoPermissionException;
import com.aliyun.datahub.client.exception.ShardNotFoundException;
import com.aliyun.datahub.client.model.Field;
import com.aliyun.datahub.client.model.FieldType;
import com.aliyun.datahub.client.model.RecordEntry;
import com.aliyun.datahub.client.model.RecordSchema;
import com.aliyun.datahub.client.model.TupleRecordData;
import com.aliyun.datahub.clientlibrary.config.ProducerConfig;
import com.aliyun.datahub.clientlibrary.producer.Producer;
import com.aliyun.datahub.exception.ResourceNotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class DatahubWriter {
    private static final Logger LOG = LoggerFactory.getLogger(DatahubWriter.class);

    private static void sleep(long milliSeconds) {
        try {
            TimeUnit.MILLISECONDS.sleep(milliSeconds);
        } catch (InterruptedException e) {
            // TODO: handle exceptions.
        }
    }

    private static List<RecordEntry> genRecords(RecordSchema schema) {
        List<RecordEntry> recordEntries = new ArrayList<>();
        for (int cnt = 0; cnt < 10; ++cnt) {
            RecordEntry entry = new RecordEntry();

            entry.addAttribute("key1", "value1");
            entry.addAttribute("key2", "value2");

            TupleRecordData data = new TupleRecordData(schema);
            data.setField("field1", "testValue");
            data.setField("field2", 1);

            entry.setRecordData(data);
            recordEntries.add(entry);
        }
        return recordEntries;
    }

    private static void sendRecords(Producer producer, List<RecordEntry> recordEntries) {
        int maxRetry = 3;
        while (true) {
            try {
                // Write data to a shard that is automatically selected by DataHub.
                producer.send(recordEntries, maxRetry);

                // Write data to the shard whose ID is 0.
                // producer.send(recordEntries, "0", maxRetry);
                LOG.error("send records: {}", recordEntries.size());
                break;
            } catch (MalformedRecordException e) {
                // Specify whether to ignore the error or throw an exception when the data record to be written is in an invalid format.
                LOG.error("write fail", e);
                throw e;
            } catch (InvalidParameterException |
                    AuthorizationFailureException |
                    NoPermissionException e) {
                // The request parameters are invalid.
                // The signature is invalid.
                // No permissions.
                LOG.error("write fail", e);
                throw e;
            } catch (ShardNotFoundException e) {
                // Handle the exception that the specified shard does not exist. If you write data to a shard that is automatically selected by DataHub, you do not need to handle this exception.
                LOG.error("write fail", e);
                sleep(1000);
            } catch (ResourceNotFoundException e) {
                // Handle the exception that the specified project, topic, or shard does not exist.
                LOG.error("write fail", e);
                throw e;
            } catch (DatahubClientException e) {
                // Handle base class exceptions such as network issues. For example, you can perform a retry.
                LOG.error("write fail", e);
                sleep(1000);
            }
        }
    }

    public static void main(String[] args) {
        // In this example, the endpoint of the China (Hangzhou) region is used. Use the endpoint of your actual region.
        String endpoint = "http://dh-cn-hangzhou.aliyuncs.com";
        String accessId = "<YourAccessKeyId>";
        String accessKey = "<YourAccessKeySecret>";
        String projectName = "<YourProjectName>";
        String topicName = "<YourTopicName>";


        RecordSchema schema =  datahubClient.getTopic(projectName, topicName).getRecordSchema();



        ProducerConfig config = new ProducerConfig(endpoint, accessId, accessKey);
        Producer producer = new Producer(projectName, topicName, config);

        // Configure a loop based on your business scenario.
        boolean stop = false;
        try {
            while (!stop) {
                List<RecordEntry> recordEntries = genRecords(schema);
                sendRecords(producer, recordEntries);
            }
        } finally {
            // Make sure that resources are released as expected.
            producer.close();
        }
    }
}

Write data in multiple ways

If the version of your server is earlier than 2.12, you can write data to DataHub only by calling the putRecords method. The RecordEntry class provides three attributes: shardId, partitionKey, and hashKey. You can use these attributes to specify a shard to which data is to be written. If the ExpandMode parameter of the topic is set to ONLY_EXTEND, the partitionKey and hashKey attributes are not supported.

Note

If the version of your server is 2.12 or later, we recommend that you call the putRecordsByShard method to write data. This avoids performance loss caused by partitions on the server.

1). Write data based on a specified shard ID. We recommend that you write data in this way. The following code provides an example:

RecordEntry entry = new RecordEntry();
entry.setShardId("0");

2). Write data based on a specified hash key. Specify a 128-bit MD5 value. DataHub determines the shard to which data is to be written based on the values of the BeginHashKey and EndHashKey parameters of shards. The following code provides an example:

RecordEntry entry = new RecordEntry();
entry.setHashKey("7FFFFFFFFFFFFFFD7FFFFFFFFFFFFFFD");

3). Write data based on a specified partition key. Specify a parameter of the STRING type as the partition key. DataHub determines the shard to which data is to be written based on the MD5 value of the string and the values of the BeginHashKey and EndHashKey parameters of shards. The following code provides an example:

RecordEntry entry = new RecordEntry();
entry.setPartitionKey("TestPartitionKey");

Note

A consumer or producer cannot access DataHub by using multiple threads. If you want to use multiple threads, specify a different consumer or producer for each thread.