すべてのプロダクト
Search
ドキュメントセンター

DataHub:データの読み取りと書き込み

最終更新日:Jan 22, 2025

データの読み取りと書き込み

  • DataHub SDK を呼び出して、CLOSED または ACTIVE 状態のシャードからデータを読み取り、ACTIVE 状態のシャードにデータを書き込むことができます。

  • また、DataHub SDK for Java でデータの読み取りおよび書き込み操作をカプセル化する datahub-client-library 依存関係をインポートすることもできます。プロデューサーを使用してシャードへの均等なデータ書き込みを実装したり、コンシューマーを使用して共同消費を実装したりできます。共同消費の実装をお勧めします

DataHub からデータを読み取る

DataHub からデータを読み取るには、次の方法があります。

  1. DataHub SDK を呼び出します。

  2. 共同消費機能を使用します。

    DataHub SDK の呼び出し

    手順 1: カーソル位置を指定する

    Topic からデータを読み取るには、データが存在するシャードを指定する必要があります。また、データ読み取りのカーソル位置を指定する必要もあります。カーソル位置は、OLDEST、LATEST、SEQUENCE、SYSTEM_TIME のいずれかの方法で指定できます。

  • OLDEST: 有効なデータの中で最も古いデータレコードを指すカーソル。

  • LATEST: 最新のデータレコードを指すカーソル。

  • SEQUENCE: 指定されたシーケンス番号を持つデータレコードを指すカーソル。

  • SYSTEM_TIME: 指定されたタイムスタンプ以降に取り込まれた最初のデータレコードを指すカーソル。

    説明

    GetCursorResult getCursor(String projectName, String topicName, String shardId, CursorType type);GetCursorResult getCursor(String projectName, String topicName, String shardId, CursorType type, long param);

  • パラメーター

    • projectName: プロジェクト名。

    • topicName: Topic 名。

    • shardId: シャードの ID。

    • CursorType: カーソルの位置。

  • 例外

    • DatahubClientException

    • InvalidParameterException

    • AuthorizationFailureException

    • ResourceNotFoundException

    • SeekOutOfRangeException

  • サンプルコード

    public static void getcursor() {
      String shardId = "5";
      try {
          /* OLDEST 使用例 */
          String oldestCursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.OLDEST).getCursor();
          /* LATEST 使用例 */
          String latestCursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.LATEST).getCursor();
          /* SEQUENCE 使用例 */
          // 最新のデータレコードのシーケンス番号を取得します。
          long seq = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.LATEST).getSequence();
          // 最新の 10 個のデータレコードの読み取り位置を取得します。
          String seqCursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.SEQUENCE, seq - 9).getCursor();
          /* SYSTEM_TIME 使用例 */
          // 時間をタイムスタンプに変換します。
          String time = "2019-07-01 10:00:00";
          SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
          long timestamp = 0L;
          try {
              Date date = simpleDateFormat.parse(time);
              timestamp = date.getTime();// 時間のタイムスタンプを取得します。
              //System.out.println(timestamp);
          } catch (ParseException e) {
              System.exit(-1);
          }
          // タイムスタンプ以降の読み取り位置を取得します。
          String timeCursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.SYSTEM_TIME, timestamp).getCursor();
          System.out.println("get cursor successful");
      } catch (InvalidParameterException e) {
          System.out.println("invalid parameter, please check your parameter");
          System.exit(1);
      } catch (AuthorizationFailureException e) {
          System.out.println("AK error, please check your accessId and accessKey");
          System.exit(1);
      } catch (ResourceNotFoundException e) {
          System.out.println("project or topic or shard not found");
          System.exit(1);
      } catch (SeekOutOfRangeException e) {
          System.out.println("offset invalid or has expired");
          System.exit(1);
      } catch (DatahubClientException e) {
          System.out.println("other error");
          System.out.println(e);
          System.exit(1);
      }
    }

手順 2: データを読み取るメソッドを呼び出す

説明

GetRecordsResult getRecords(String projectName, String topicName, String shardId, String cursor, int limit);GetRecordsResult getRecords(String projectName, String topicName, String shardId, RecordSchema schema, String cursor, int limit);

  • パラメーター

    • projectName: プロジェクト名。

    • topicName: Topic 名。

    • shardId: シャードの ID。

    • schema: データのスキーマ。TUPLE Topic からデータを読み取る場合は、このパラメーターが必要です。

    • cursor: データ読み取りのカーソルの位置。

    • limit: 読み取るデータレコードの最大数。

  • 例外

    • DatahubClientException

    • InvalidParameterException

    • AuthorizationFailureException

    • ResourceNotFoundException

    • ShardSealedException

    • LimitExceededException

  • サンプルコード

1). TUPLE Topic からデータを読み取る

public static void example() {
     // 一度に読み取るデータレコードの最大数。
     int recordLimit = 1000;
     String shardId = "7";
     // カーソル位置を指定します。この例では、有効なデータの中で最も古いデータレコードを指すカーソルが指定されています。
     // 注: 通常、getCursor メソッドは最初のデータ読み取り操作に対してのみ呼び出す必要があります。後続のデータ読み取り操作では、getRecords メソッドを呼び出してカーソル位置を指定します。
     String cursor = "";
     try {
         cursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.OLDEST).getCursor();
     } catch (InvalidParameterException e) {
         System.out.println("invalid parameter, please check your parameter");
         System.exit(1);
     } catch (AuthorizationFailureException e) {
         System.out.println("AK error, please check your accessId and accessKey");
         System.exit(1);
     } catch (ResourceNotFoundException e) {
         System.out.println("project or topic or shard not found");
         System.exit(1);
     } catch (SeekOutOfRangeException e) {
         System.out.println("offset invalid or has expired");
         System.exit(1);
     } catch (DatahubClientException e) {
         System.out.println("other error");
         System.out.println(e);
         System.exit(1);
     }
     while (true) {
         try {
             GetRecordsResult result = datahubClient.getRecords(Constant.projectName, Constant.topicName, shardId, recordSchema, cursor, recordLimit);
             if (result.getRecordCount() <= 0) {
                 // レコードが読み取られない場合は、スレッドを 1 秒間一時停止してから、レコードの読み取りを続行します。
                 Thread.sleep(10000);
                 continue;
             }
             for (RecordEntry entry : result.getRecords()) {
                 TupleRecordData data = (TupleRecordData) entry.getRecordData();
                 System.out.println("field1:" + data.getField("field1") + "\t"
                         + "field2:" + data.getField("field2"));
             }
             // 読み取る次のデータレコードを指すカーソル位置を指定します。
             cursor = result.getNextCursor();
         } catch (InvalidCursorException ex) {
             // カーソル位置が無効であるか、期限切れです。別のカーソル位置を消費の開始点として指定します。
             cursor = datahubClient.getCursor(Constant.projectName, Constant.topicName, shardId, CursorType.OLDEST).getCursor();
         } catch (SeekOutOfRangeException e) {
             System.out.println("offset invalid");
             System.exit(1);
         } catch (ResourceNotFoundException e) {
             System.out.println("project or topic or shard not found");
             System.exit(1);
         } catch (ShardSealedException e) {
             System.out.println("shard is closed, all data has been read");
             System.exit(1);
         } catch (LimitExceededException e) {
             System.out.println("maybe exceed limit, retry");
         } catch (DatahubClientException e) {
             System.out.println("other error");
             System.out.println(e);
             System.exit(1);
         } catch (InterruptedException e) {
             e.printStackTrace();
         }
     }
 }

2). BLOB Topic からデータを読み取る

public static void example() {
    // 一度に読み取るデータレコードの最大数。
    int recordLimit = 1000;
    String shardId = "7";
    // カーソル位置を指定します。この例では、有効なデータの中で最も古いデータレコードを指すカーソルが指定されています。
    // 注: 通常、getCursor メソッドは最初のデータ読み取り操作に対してのみ呼び出す必要があります。後続のデータ読み取り操作では、getRecords メソッドを呼び出してカーソル位置を指定します。
    String cursor = "";
    try {
        cursor = datahubClient.getCursor(Constant.projectName, Constant.blobTopicName, shardId, CursorType.OLDEST).getCursor();
    } catch (InvalidParameterException e) {
        System.out.println("invalid parameter, please check your parameter");
        System.exit(1);
    } catch (AuthorizationFailureException e) {
        System.out.println("AK error, please check your accessId and accessKey");
        System.exit(1);
    } catch (ResourceNotFoundException e) {
        System.out.println("project or topic or shard not found");
        System.exit(1);
    } catch (SeekOutOfRangeException e) {
        System.out.println("offset invalid or has expired");
        System.exit(1);
    } catch (DatahubClientException e) {
        System.out.println("other error");
        System.out.println(e);
        System.exit(1);
    }
    while (true) {
        try {
            GetRecordsResult result = datahubClient.getRecords(Constant.projectName, Constant.blobTopicName, shardId, recordSchema, cursor, recordLimit);
            if (result.getRecordCount() <= 0) {
                // レコードが読み取られない場合は、スレッドを 1 秒間一時停止してから、レコードの読み取りを続行します。
                Thread.sleep(10000);
                continue;
            }
            /* データを消費します。 */
            for (RecordEntry record: result.getRecords()){
                 BlobRecordData data = (BlobRecordData) record.getRecordData();
                 System.out.println(new String(data.getData()));
            }
            // 読み取る次のデータレコードを指すカーソル位置を指定します。
            cursor = result.getNextCursor();
        } catch (InvalidCursorException ex) {
            // カーソル位置が無効であるか、期限切れです。別のカーソル位置を消費の開始点として指定します。
            cursor = datahubClient.getCursor(Constant.projectName, Constant.blobTopicName, shardId, CursorType.OLDEST).getCursor();
        } catch (SeekOutOfRangeException e) {
            System.out.println("offset invalid");
            System.exit(1);
        } catch (ResourceNotFoundException e) {
            System.out.println("project or topic or shard not found");
            System.exit(1);
        } catch (ShardSealedException e) {
            System.out.println("shard is closed, all data has been read");
            System.exit(1);
        } catch (LimitExceededException e) {
            System.out.println("maybe exceed limit, retry");
        } catch (DatahubClientException e) {
            System.out.println("other error");
            System.out.println(e);
            System.exit(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

共同消費機能の使用

手順 1: コンシューマーを初期化する

構成

パラメーター

説明

autoCommit

消費オフセットを自動的に送信するかどうかを指定します。デフォルト値: true。消費オフセットは、バックグラウンドスレッドで指定された時間間隔で自動的に送信されます。データ読み取りメソッドが呼び出されると、消費オフセットが自動的に送信され、読み取りデータの処理は完了したと見なされます。このパラメーターを false に設定した場合、各データレコードは処理完了後に確認応答する必要があります。バックグラウンドでのオフセット送信により、オフセットより前のすべてのデータレコードが確認応答されます。

offsetCommitTimeoutMs

消費オフセットが送信される時間間隔。単位: ミリ秒。有効な値: [3000,300000]。デフォルト値: 30000。

sessionTimeoutMs

セッションタイムアウト期間。ハートビート間隔はこの値の 3 分の 2 です。ハートビート間隔内にクライアントからハートビートメッセージが受信されない場合、クライアントは停止したと見なされます。サーバーは占有されたシャードを再割り当てします。単位: ミリ秒。有効な値: [60000,180000]。デフォルト値: 60000。

fetchSize

非同期データ読み取り中に単一シャードから読み取ることができるデータレコードのサイズ。キャッシュされたデータレコードの最大サイズは、この値の 2 倍です。キャッシュされたデータレコードの実際のサイズがこの値の 2 倍未満の場合、非同期データ読み取りがトリガーされます。このパラメーターの値は 0 より大きい必要があります。デフォルト値: 1000。

{
        // この例では、中国 (杭州) リージョンのエンドポイントを使用しています。実際のリージョンのエンドポイントを使用してください。
        String endpoint = "http://dh-cn-hangzhou.aliyuncs.com";
        String accessId = "<YourAccessKeyId>";
        String accessKey = "<YourAccessKeySecret>";
        String projectName = "<YourProjectName>";
        String topicName = "<YourTopicName>";
        String subId = "<YourSubscriptionId>";
        /**
         * 1. 共同消費とオフセットベースのデータ消費機能を使用します。
         * 共同消費機能の使用:
             2 台のマシンが同じサブスクリプション ID を使用して、5 つのシャードを含む Topic のデータを使用する場合、シャードをマシンに割り当てる必要はありません。
            シャードはサーバーによって自動的に割り当てられます。3 台目のマシンが追加されると、サーバーはシャードを再割り当てします。
         *
         * オフセットベースのデータ消費機能の使用:
             オフセットベースのデータ消費では、サブスクリプション ID に対応するオフセットに基づいてデータが読み取られます。サブスクリプションが新しく作成され、オフセットがない場合、データは最初から読み取られます。特定の時点からデータを読み取る場合は、DataHub コンソールでサブスクリプション ID のオフセットをリセットします。
         *
         * */
        ConsumerConfig config = new ConsumerConfig(endpoint, accessId, accessKey);
        Consumer consumer = new Consumer(projectName, topicName, subId, config);



        /**
         * 2. オフセットベースのデータ消費機能を共同消費機能の代わりに使用する場合、コンシューマーが読み取るサブスクリプション ID とシャードを指定します。
         * 共同消費機能を使用しない:
             2 台のマシンが同じサブスクリプション ID を使用して、5 つのシャードを含む Topic のデータを使用する場合、シャードをマシンに割り当てる必要があります。
            たとえば、シャード 0、1、2 をクライアント A に、シャード 3 と 4 をクライアント B に割り当てることができます。3 台目のマシンが追加された場合は、シャードを再割り当てする必要があります。
         *
         * */

        // クライアント A はシャード 0、1、2 のデータを使用します。
        List<String> shardlists = Arrays.asList("0", "1", "2");
        ConsumerConfig config = new ConsumerConfig(endpoint, accessId, accessKey);
        Consumer consumer = new Consumer(projectName, topicName, subId, shardlists, config);


        // クライアント B はシャード 3 と 4 のデータを使用します。
//        List<String> shardlists = Arrays.asList("3", "4");
//        ConsumerConfig config = new ConsumerConfig(endpoint, accessId, accessKey);
//        Consumer consumer = new Consumer(projectName, topicName, subId, shardlists, config);



        /**
         *3. 共同消費またはオフセットベースのデータ消費機能を使用しません。
         * オフセットベースのデータ消費機能を使用しない:
             オフセットベースのデータ消費機能を使用しない場合は、データベースなどのストレージサービスを使用して、消費された各データレコードのタイムスタンプまたはシーケンス番号を保存する必要があります。毎回、記録された情報に基づいてデータを読み取る必要があります。
         *
         * */
        Map<String, Offset> offsetMap = new HashMap<>();
// シーケンス番号とタイムスタンプの両方が指定されているが、シーケンス番号が無効な場合は、タイムスタンプに基づいてカーソル位置を指定します。
        offsetMap.put("0", new Offset(100, 1548573440756L));
// シーケンス番号のみが指定されている場合は、シーケンス番号に基づいてカーソル位置を指定します。
        offsetMap.put("1", new Offset().setSequence(1));
// タイムスタンプのみが指定されている場合は、タイムスタンプに基づいてカーソル位置を指定します。
        offsetMap.put("2", new Offset().setTimestamp(1548573440756L));
        ConsumerConfig config = new ConsumerConfig(endpoint, accessId, accessKey);
        Consumer consumer = new Consumer(projectName, topicName, subId, offsetMap, config);
    }

手順 2: 共同消費を実装する

import com.aliyun.datahub.client.exception.AuthorizationFailureException;
import com.aliyun.datahub.client.exception.DatahubClientException;
import com.aliyun.datahub.client.exception.InvalidParameterException;
import com.aliyun.datahub.client.exception.NoPermissionException;
import com.aliyun.datahub.client.exception.SubscriptionOfflineException;
import com.aliyun.datahub.client.exception.SubscriptionOffsetResetException;
import com.aliyun.datahub.client.exception.SubscriptionSessionInvalidException;
import com.aliyun.datahub.client.model.RecordEntry;
import com.aliyun.datahub.client.model.TupleRecordData;
import com.aliyun.datahub.clientlibrary.config.ConsumerConfig;
import com.aliyun.datahub.clientlibrary.consumer.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.TimeUnit;

public class DatahubReader {
    private static final Logger LOG = LoggerFactory.getLogger(DatahubReader.class);

    private static void sleep(long milliSeconds) {
        try {
            TimeUnit.MILLISECONDS.sleep(milliSeconds);
        } catch (InterruptedException e) {
            // TODO: 例外を処理します。
        }
    }

    public static Consumer createConsumer(ConsumerConfig config, String project, String topic, String subId)
    {
        return new Consumer(project, topic, subId, config);
    }

    public static void main(String[] args) {
        String endpoint = "http://dh-cn-hangzhou.aliyuncs.com";
        String accessId = "<YourAccessKeyId>";
        String accessKey = "<YourAccessKeySecret>";
        String projectName = "<YourProjectName>";
        String topicName = "<YourTopicName>";
        String subId = "<YourSubscriptionId>";

        ConsumerConfig config = new ConsumerConfig(endpoint, accessId, accessKey);
        Consumer consumer = createConsumer(config, projectName, topicName, subId);

        int maxRetry = 3;
        boolean stop = false;
        try {
            while (!stop) {
                try {
                    while (true) {
                        // 共同消費の初期化後、サーバーがシャードを割り当てるまで約 40 秒待ちます。この期間中、データ読み取りリクエストの戻り結果は NULL になります。
                        // オフセットの自動送信が有効になっています。データ読み取りメソッドが呼び出されるたびに、読み取られたデータの処理は完了し、自動的に確認応答されたと見なされます。
                        RecordEntry record = consumer.read(maxRetry);

                        // データを処理します。
                        if (record != null) {
                            TupleRecordData data = (TupleRecordData) record.getRecordData();
                            // 独自のスキーマに基づいてデータを処理します。この例では、最初のフィールドの値をエクスポートします。
                            LOG.info("field1: {}", data.getField(0));

                            // フィールド名に基づいてデータを取得します。
                            // LOG.info("field2: {}", data.getField("field2"));

                            // オフセットの自動送信が無効になっています。各データレコードは、処理の完了後に確認応答する必要があります。
                            // オフセットの自動送信が有効になっています。確認応答操作は実行されません。
                            // バージョンは 1.1.7 以降である必要があります。
                            record.getKey().ack();
                        } else {
                            LOG.info("read null");
                        }
                    }
                } catch (SubscriptionOffsetResetException e) {
                    // オフセットがリセットされ、コンシューマーが再初期化されます。
                    try {
                        consumer.close();
                        consumer = createConsumer(config, projectName, topicName, subId);
                    } catch (DatahubClientException e1) {
                        // 初期化に失敗しました。再試行するか、例外をスローします。
                        LOG.error("create consumer failed", e);
                        throw e;
                    }
                } catch (InvalidParameterException |
                        SubscriptionOfflineException |
                        SubscriptionSessionInvalidException |
                        AuthorizationFailureException |
                        NoPermissionException e) {
                    // リクエストパラメーターが無効です。
                    // サブスクリプションはオフラインです。
                    // サブスクライブされたシャードは別のクライアントによって占有されています。
                    // 署名が無効です。
                    // 権限がありません。
                    LOG.error("read failed", e);
                    throw e;
                } catch (DatahubClientException e) {
                    // ネットワークの問題など、基底クラスの例外を処理します。たとえば、再試行を実行できます。
                    LOG.error("read failed, retry", e);
                    sleep(1000);
                }
            }
        } catch (Throwable e) {
            LOG.error("read failed", e);
        } finally {
            // リソースが想定どおりに解放されていることを確認します。
            // 確認応答されたオフセットを送信します。
            consumer.close();
        }
    }
}

DataHub にデータを書き込む

DataHub SDK の呼び出し

putRecordsByShardResult メソッドは、バージョン 2.12 以降のサーバーでサポートされています。以前のバージョンは、putRecords メソッドのみをサポートしています。putRecordsByShardResult メソッドを呼び出すときは、データを書き込むシャードを指定します。指定しない場合、デフォルトでは ACTIVE 状態の最初のシャードにデータが書き込まれます。前述の 2 つのメソッドの records リクエストパラメーターの値はリストです。リストの各要素はデータレコードを表します。すべてのデータレコードは、TUPLE または BLOB のいずれかの同じタイプの Topic に書き込む必要があります。DataHub では、2 つの方法でデータを書き込むことができます。putRecordsByShard メソッドを呼び出して、シャードごとにデータを書き込むことができます。これには、サーバーのバージョンが 2.12 以降である必要があります。また、putRecords メソッドを呼び出してデータを書き込むこともできます。putRecords メソッドを呼び出してデータを書き込む場合は、PutRecordsResult メソッドを呼び出して、データ書き込みが成功したかどうかを確認する必要があります。putRecordsByShard メソッドを呼び出してデータを書き込む場合、DataHub はデータ書き込みの失敗に対して例外をスローします。サーバーのバージョンが 2.12 以降の場合は、putRecordsByShard メソッドを呼び出してデータを書き込むことをお勧めします

説明

PutRecordsResult putRecords(String projectName, String topicName, List records);PutRecordsByShardResult putRecordsByShard(String projectName, String topicName, String shardId, List records);

  • パラメーター

    • projectName: プロジェクト名。

    • topicName: Topic 名。

    • shardId: シャードの ID。

    • records: 書き込むデータレコード。

  • 例外

    • DatahubClientException

    • InvalidParameterException

    • AuthorizationFailureException

    • ResourceNotFoundException

    • ShardSealedException

    • LimitExceededException

1). TUPLE Topic にデータを書き込む

// TUPLE Topic にデータを書き込みます。
public static void tupleExample() {
    String shardId = "9";
    // スキーマを取得します。
    recordSchema = datahubClient.getTopic(Constant.projectName, Constant.topicName).getRecordSchema();
    // 10 個のデータレコードを生成します。
    List<RecordEntry> recordEntries = new ArrayList<>();
    for (int i = 0; i < 10; ++i) {
        RecordEntry recordEntry = new RecordEntry();
        // 各データレコードに追加属性 (IP アドレス、ホスト名など) を設定します。追加属性はオプションです。追加属性を設定しない場合、データ書き込みは影響を受けません。
        recordEntry.addAttribute("key1", "value1");
        TupleRecordData data = new TupleRecordData(recordSchema);
        data.setField("field1", "HelloWorld");
        data.setField("field2", 1234567);
        recordEntry.setRecordData(data);
        recordEntry.setShardId(shardId);
        recordEntries.add(recordEntry);
    }
    try {
        // putRecordsByShard メソッドは、サーバーのバージョンが 2.12 以降の場合にのみサポートされます。以前のバージョンでは、putRecords メソッドを呼び出します。
        //datahubClient.putRecordsByShard(Constant.projectName, Constant.topicName, shardId, recordEntries);
        datahubClient.putRecords(Constant.projectName, Constant.topicName, recordEntries);
        System.out.println("write data successful");
    } catch (InvalidParameterException e) {
        System.out.println("invalid parameter, please check your parameter");
        System.exit(1);
    } catch (AuthorizationFailureException e) {
        System.out.println("AK error, please check your accessId and accessKey");
        System.exit(1);
    } catch (ResourceNotFoundException e) {
        System.out.println("project or topic or shard not found");
        System.exit(1);
    } catch (ShardSealedException e) {
        System.out.println("shard status is CLOSED, can not write");
        System.exit(1);
    } catch (DatahubClientException e) {
        System.out.println("other error");
        System.out.println(e);
        System.exit(1);
    }
}

2). BLOB Topic にデータを書き込む

// BLOB Topic にデータを書き込みます。
public static void blobExample() {
    // 10 個のデータレコードを生成します。
    List<RecordEntry> recordEntries = new ArrayList<>();
    String shardId = "4";
    for (int i = 0; i < 10; ++i) {
        RecordEntry recordEntry = new RecordEntry();
        // 各データレコードに追加属性を設定します。
        recordEntry.addAttribute("key1", "value1");
        BlobRecordData data = new BlobRecordData("123456".getBytes(Charsets.UTF_8));
        recordEntry.setRecordData(data);
        recordEntry.setShardId(shardId);
        recordEntries.add(recordEntry);
        recordEntry.setShardId("0");
    }
    while (true) {
        try {
            // putRecordsByShard メソッドは、サーバーのバージョンが 2.12 以降の場合にのみサポートされます。以前のバージョンでは、putRecords メソッドを呼び出します。
            //datahubClient.putRecordsByShard(Constant.projectName, Constant.topicName, shardId, recordEntries);
            datahubClient.putRecords(Constant.projectName, Constant.topicName, recordEntries);
            System.out.println("write data  successful");
            break;
        } catch (InvalidParameterException e) {
            System.out.println("invalid parameter, please check your parameter");
            System.exit(1);
        } catch (AuthorizationFailureException e) {
            System.out.println("AK error, please check your accessId and accessKey");
            System.exit(1);
        } catch (ResourceNotFoundException e) {
            System.out.println("project or topic or shard not found");
            System.exit(1);
        } catch (ShardSealedException e) {
            System.out.println("shard status is CLOSED, can not write");
            System.exit(1);
        } catch (LimitExceededException e) {
            System.out.println("maybe qps exceed limit, retry");
        } catch (DatahubClientException e) {
            System.out.println("other error");
            System.out.println(e);
            System.exit(1);
        }
    }
}

プロデューサーの使用

手順 1: 依存関係をインポートする

<dependency>
    <groupId>com.aliyun.datahub</groupId>
    <artifactId>aliyun-sdk-datahub</artifactId>
    <version>2.19.0-public</version>
</dependency>
<dependency>
      <groupId>com.aliyun.datahub</groupId>
      <artifactId>datahub-client-library</artifactId>
      <version>1.1.12-public</version>
</dependency>

手順 2: コードを記述する

import com.aliyun.datahub.client.exception.AuthorizationFailureException;
import com.aliyun.datahub.client.exception.DatahubClientException;
import com.com.aliyun.datahub.client.exception.InvalidParameterException;
import com.aliyun.datahub.client.exception.MalformedRecordException;
import com.aliyun.datahub.client.exception.NoPermissionException;
import com.aliyun.datahub.client.exception.ShardNotFoundException;
import com.aliyun.datahub.client.model.Field;
import com.aliyun.datahub.client.model.FieldType;
import com.aliyun.datahub.client.model.RecordEntry;
import com.aliyun.datahub.client.model.RecordSchema;
import com.aliyun.datahub.client.model.TupleRecordData;
import com.aliyun.datahub.clientlibrary.config.ProducerConfig;
import com.aliyun.datahub.clientlibrary.producer.Producer;
import com.aliyun.datahub.exception.ResourceNotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class DatahubWriter {
    private static final Logger LOG = LoggerFactory.getLogger(DatahubWriter.class);

    private static void sleep(long milliSeconds) {
        try {
            TimeUnit.MILLISECONDS.sleep(milliSeconds);
        } catch (InterruptedException e) {
            // TODO: 例外を処理します。
        }
    }

    private static List<RecordEntry> genRecords(RecordSchema schema) {
        List<RecordEntry> recordEntries = new ArrayList<>();
        for (int cnt = 0; cnt < 10; ++cnt) {
            RecordEntry entry = new RecordEntry();

            entry.addAttribute("key1", "value1");
            entry.addAttribute("key2", "value2");

            TupleRecordData data = new TupleRecordData(schema);
            data.setField("field1", "testValue");
            data.setField("field2", 1);

            entry.setRecordData(data);
            recordEntries.add(entry);
        }
        return recordEntries;
    }

    private static void sendRecords(Producer producer, List<RecordEntry> recordEntries) {
        int maxRetry = 3;
        while (true) {
            try {
                // DataHub によって自動的に選択されたシャードにデータを書き込みます。
                producer.send(recordEntries, maxRetry);

                // ID が 0 のシャードにデータを書き込みます。
                // producer.send(recordEntries, "0", maxRetry);
                LOG.error("send records: {}", recordEntries.size());
                break;
            } catch (MalformedRecordException e) {
                // 書き込むデータレコードの形式が無効な場合に、エラーを無視するか例外をスローするかを指定します。
                LOG.error("write fail", e);
                throw e;
            } catch (InvalidParameterException |
                    AuthorizationFailureException |
                    NoPermissionException e) {
                // リクエストパラメーターが無効です。
                // 署名が無効です。
                // 権限がありません。
                LOG.error("write fail", e);
                throw e;
            } catch (ShardNotFoundException e) {
                // 指定されたシャードが存在しないという例外を処理します。DataHub によって自動的に選択されたシャードにデータを書き込む場合は、この例外を処理する必要はありません。
                LOG.error("write fail", e);
                sleep(1000);
            } catch (ResourceNotFoundException e) {
                // 指定されたプロジェクト、Topic、またはシャードが存在しないという例外を処理します。
                LOG.error("write fail", e);
                throw e;
            } catch (DatahubClientException e) {
                // ネットワークの問題など、基底クラスの例外を処理します。たとえば、再試行を実行できます。
                LOG.error("write fail", e);
                sleep(1000);
            }
        }
    }

    public static void main(String[] args) {
        // この例では、中国 (杭州) リージョンのエンドポイントを使用しています。実際のリージョンのエンドポイントを使用してください。
        String endpoint = "http://dh-cn-hangzhou.aliyuncs.com";
        String accessId = "<YourAccessKeyId>";
        String accessKey = "<YourAccessKeySecret>";
        String projectName = "<YourProjectName>";
        String topicName = "<YourTopicName>";


        RecordSchema schema =  datahubClient.getTopic(projectName, topicName).getRecordSchema();



        ProducerConfig config = new ProducerConfig(endpoint, accessId, accessKey);
        Producer producer = new Producer(projectName, topicName, config);

        // ビジネスシナリオに基づいてループを構成します。
        boolean stop = false;
        try {
            while (!stop) {
                List<RecordEntry> recordEntries = genRecords(schema);
                sendRecords(producer, recordEntries);
            }
        } finally {
            // リソースが想定どおりに解放されていることを確認します。
            producer.close();
        }
    }
}

複数の方法でデータを書き込む

サーバーのバージョンが 2.12 より前の場合、putRecords メソッドを呼び出すことによってのみ DataHub にデータを書き込むことができます。RecordEntry クラスは、shardIdpartitionKeyhashKey の 3 つの属性を提供します。これらの属性を使用して、データを書き込むシャードを指定できます。Topic の ExpandMode パラメーターが ONLY_EXTEND に設定されている場合、partitionKey 属性と hashKey 属性はサポートされません。

説明

サーバーのバージョンが 2.12 以降の場合は、putRecordsByShard メソッドを呼び出してデータを書き込むことをお勧めします。これにより、サーバー上のパーティションによって引き起こされるパフォーマンスの低下を回避できます。

1). 指定されたシャード ID に基づいてデータを書き込みます。 この方法でデータを書き込むことをお勧めします。次のコードは例を示しています。

RecordEntry entry = new RecordEntry();
entry.setShardId("0");

2). 指定されたハッシュキーに基づいてデータを書き込みます。 128 ビットの MD5 値を指定します。 DataHub は、シャードの シャードの管理の値に基づいて、データを書き込むシャードを決定します。次のコードは例を示しています。

RecordEntry entry = new RecordEntry();
entry.setHashKey("7FFFFFFFFFFFFFFD7FFFFFFFFFFFFFFD");

3). 指定されたパーティションキーに基づいてデータを書き込みます。 STRING タイプのパラメーターをパーティションキーとして指定します。DataHub は、文字列の MD5 値と、シャードの シャードの管理の値に基づいて、データを書き込むシャードを決定します。次のコードは例を示しています。

RecordEntry entry = new RecordEntry();
entry.setPartitionKey("TestPartitionKey");

コンシューマーまたはプロデューサーは、複数のスレッドを使用して DataHub にアクセスすることはできません。複数のスレッドを使用する場合は、スレッドごとに異なるコンシューマーまたはプロデューサーを指定します。