サブスクリプション機能の概要
DataHub トピックのデータを使用し、アプリケーションに障害が発生した場合にその時点から使用を再開する必要があるシナリオでは、再開可能な使用が必要です。 使用を再開する必要がある場合は、現在の使用オフセットを保存し、使用オフセットの保存に使用されるサービスが高可用性をサポートしていることを確認する必要があります。 これにより、アプリケーション開発の複雑さが増します。 DataHub のサブスクリプション機能を使用すると、使用オフセットをサーバーに保存して、前述の問題を解決できます。 この機能を有効にし、アプリケーションに数行のコードを追加するだけで、高可用性を備えた使用オフセット保守サービスを利用できます。 さらに、サブスクリプション機能では、使用オフセットをリセットできます。 これにより、データが少なくとも 1 回は使用されることが保証されます。 たとえば、アプリケーションが特定の期間に使用されたデータを処理しているときにエラーが発生し、そのデータを再度使用したいとします。 この場合、アプリケーションを再起動せずに使用オフセットをリセットできます。 アプリケーションは、指定された使用オフセットから自動的にデータを使用します。
サブスクリプションの作成
アカウントに、特定のプロジェクトのトピックをサブスクライブする権限があることを確認します。 詳細については、「アクセス制御」をご参照ください。 サブスクリプションを作成するには、次の手順を実行します。
トピックの詳細ページに移動します。 右上隅にある [サブスクリプション] をクリックします。

[サブスクリプションの作成] パネルで、必要に応じてパラメーターを設定します。 次に、[作成] をクリックします。
アプリケーション: サブスクリプションを作成するアプリケーションの名前。
説明: サブスクリプションの説明。

[サブスクリプションリスト] タブで、作成されたサブスクリプションを見つけ、[コンシューマーオフセット] 列の検索アイコンをクリックして、すべてのシャードの使用状況を表示します。


2. 例
サブスクリプション機能を使用すると、使用オフセットを保存できます。 データの読み取り後に使用オフセットを保存する必要があるシナリオでは、DataHub の読み取りおよび書き込み機能を使用オフセットの保存機能とともに使用できます。 DataHub の読み取りおよび書き込み機能の詳細については、「Java 用 DataHub SDK」をご参照ください。
サンプルコード
// 保存された使用オフセットからデータを使用し、使用中に使用オフセットを送信する方法の例を次のサンプルコードに示します。
public void offset_consumption(int maxRetry) {
String endpoint = "<YourEndPoint>";
String accessId = "<YourAccessId>";
String accessKey = "<YourAccessKey>";
String projectName = "<YourProjectName>";
String topicName = "<YourTopicName>";
String subId = "<YourSubId>";
String shardId = "0";
List<String> shardIds = Arrays.asList(shardId);
// DataHub クライアントを作成します。
DatahubClient datahubClient = DatahubClientBuilder.newBuilder()
.setDatahubConfig(
new DatahubConfig(endpoint,
// バイナリデータ転送を有効にするかどうかを指定します。 V2.12 以降のサーバーは、バイナリデータ転送をサポートしています。
new AliyunAccount(accessId, accessKey), true))
.build();
RecordSchema schema = datahubClient.getTopic(projectName, topicName).getRecordSchema();
OpenSubscriptionSessionResult openSubscriptionSessionResult = datahubClient.openSubscriptionSession(projectName, topicName, subId, shardIds);
SubscriptionOffset subscriptionOffset = openSubscriptionSessionResult.getOffsets().get(shardId);
// 1. 現在の使用オフセットにあるレコードのカーソルを取得します。 レコードの有効期限が切れているか、レコードが使用されていない場合は、トピックの有効期間 (TTL) 内の最初のレコードのカーソルを取得します。
String cursor = "";
// シーケンス番号が 0 より小さい場合、レコードは使用されていません。
if (subscriptionOffset.getSequence() < 0) {
// トピックの TTL 期間内の最初のレコードのカーソルを取得します。
cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.OLDEST).getCursor();
} else {
// 次のレコードのカーソルを取得します。
long nextSequence = subscriptionOffset.getSequence() + 1;
try {
// シーケンス番号に基づいてカーソルを取得した後に SeekOutOfRange エラーが返された場合、レコードの有効期限が切れています。
cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.SEQUENCE, nextSequence).getCursor();
} catch (SeekOutOfRangeException e) {
// トピックの TTL 期間内の最初のレコードのカーソルを取得します。
cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.OLDEST).getCursor();
}
}
// 2. レコードを読み取り、使用オフセットを保存します。 この例では、TUPLE タイプのレコードを読み取り、1,000 レコードが読み取られるたびに 使用オフセットを保存します。
long recordCount = 0L;
// 毎回 1,000 レコードを読み取ります。
int fetchNum = 1000;
int retryNum = 0;
int commitNum = 1000;
while (retryNum < maxRetry) {
try {
GetRecordsResult getRecordsResult = datahubClient.getRecords(projectName, topicName, shardId, schema, cursor, fetchNum);
if (getRecordsResult.getRecordCount() <= 0) {
// 読み取ることができるレコードがない場合は、スレッドを 1 秒間一時停止し、レコードの読み取りを続行します。
System.out.println("no data, sleep 1 second");
Thread.sleep(1000);
continue;
}
for (RecordEntry recordEntry : getRecordsResult.getRecords()) {
// データを使用します。
TupleRecordData data = (TupleRecordData) recordEntry.getRecordData();
System.out.println("field1:" + data.getField("field1") + "\t"
+ "field2:" + data.getField("field2"));
// データが使用された後に使用オフセットを保存します。
recordCount++;
subscriptionOffset.setSequence(recordEntry.getSequence());
subscriptionOffset.setTimestamp(recordEntry.getSystemTime());
// 1000 レコードごとにオフセットをコミットします
if (recordCount % commitNum == 0) {
// 使用オフセットを送信します。
Map<String, SubscriptionOffset> offsetMap = new HashMap<>();
offsetMap.put(shardId, subscriptionOffset);
datahubClient.commitSubscriptionOffset(projectName, topicName, subId, offsetMap);
System.out.println("commit offset successful");
}
}
cursor = getRecordsResult.getNextCursor();
} catch (SubscriptionOfflineException | SubscriptionSessionInvalidException e) {
// サブスクリプションセッションが終了しました。 オフライン: サブスクリプションはオフラインです。 SessionChange: サブスクリプションは他のクライアントによって使用されています。
e.printStackTrace();
throw e;
} catch (SubscriptionOffsetResetException e) {
// 使用オフセットがリセットされます。 使用オフセットのバージョン情報を再度取得する必要があります。
SubscriptionOffset offset = datahubClient.getSubscriptionOffset(projectName, topicName, subId, shardIds).getOffsets().get(shardId);
subscriptionOffset.setVersionId(offset.getVersionId());
// 使用オフセットがリセットされた後、使用オフセットにあるレコードのカーソルを再度取得する必要があります。 カーソルの取得に使用される方法は、使用オフセットのリセットに使用される方法によって異なります。
// シーケンス番号とタイムスタンプの両方が使用オフセットのリセットに指定されている場合は、シーケンス番号またはタイムスタンプに基づいてカーソルを取得できます。
// シーケンス番号のみが使用オフセットのリセットに指定されている場合は、シーケンス番号のみに基づいてカーソルを取得できます。
// タイムスタンプのみが使用オフセットのリセットに指定されている場合は、タイムスタンプのみに基づいてカーソルを取得できます。
// ほとんどの場合、シーケンス番号に基づいて優先的にカーソルを取得します。 シーケンス番号またはタイムスタンプに基づいてカーソルを取得できなかった場合は、最も古いレコードのカーソルを取得します。
cursor = null;
if (cursor == null) {
try {
long nextSequence = offset.getSequence() + 1;
cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.SEQUENCE, nextSequence).getCursor();
System.out.println("get cursor successful");
} catch (DatahubClientException exception) {
System.out.println("get cursor by SEQUENCE failed, try to get cursor by SYSTEM_TIME");
}
}
if (cursor == null) {
try {
cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.SYSTEM_TIME, offset.getTimestamp()).getCursor();
System.out.println("get cursor successful");
} catch (DatahubClientException exception) {
System.out.println("get cursor by SYSTEM_TIME failed, try to get cursor by OLDEST");
}
}
if (cursor == null) {
try {
cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.OLDEST).getCursor();
System.out.println("get cursor successful");
} catch (DatahubClientException exception) {
System.out.println("get cursor by OLDEST failed");
System.out.println("get cursor failed!!");
throw e;
}
}
} catch (LimitExceededException e) {
// 制限を超えました。再試行してください
e.printStackTrace();
retryNum++;
} catch (DatahubClientException e) {
// その他のエラー、再試行
e.printStackTrace();
retryNum++;
} catch (Exception e) {
e.printStackTrace();
System.exit(-1);
}
}
}結果
1. アプリケーションを初めて起動すると、アプリケーションは最も古いレコードからデータを使用します。アプリケーションの実行中に、DataHub コンソールの [サブスクリプションリスト] タブを更新できます。 シャードの消費オフセットが先に進みます。 2. 使用中に DataHub コンソールで [リセット] をクリックして消費オフセットをリセットすると、アプリケーションは消費オフセットの変更を自動的に検出し、指定された消費オフセットからデータを使用します。 アプリケーションが OffsetResetedException をキャッチすると、アプリケーションは getSubscriptionOffset メソッドを呼び出して、サーバーから最新の消費オフセットを照会します。 その後、アプリケーションは最新の消費オフセットからデータを使用できます。 3. サブスクリプションのシャードは、複数のスレッドまたはプロセスで同時に使用できないことに注意してください。 そうしないと、あるスレッドによって送信された消費オフセットが別のスレッドによって送信された消費オフセットによって上書きされ、サーバーは保存された消費オフセットがどのスレッドに属しているかを判断できません。 この場合、サーバーは OffsetSessionChangedException をスローします。 この例外がキャッチされた場合は、サブスクリプションセッションを終了して、データが繰り返し使用されているかどうかを確認することをお勧めします。