訂閱功能使用介紹
目前使用者在DataHub上消費Topic資料,為了做到“斷點續消費”的功能,即消費者failover重啟後可以繼續從failover時間點繼續擷取資料,需要使用者自己儲存當前消費的點位資訊,同時使用者還需要關心自己的點位儲存服務的高可用,這無疑增加了使用者開發應用程式的複雜度。基於此,DataHub新上線的訂閱服務提供了服務端儲存使用者消費點位的功能,使用者只需要通過簡單的幾步配置,然後在自己的應用邏輯裡添加幾行簡單的處理邏輯,就可以擁有一個對自己“透明”的高可用的點位儲存服務。另外,訂閱服務還提供了靈活的點位重設功能,從而保證用了“At Least Once”的消費語義,比如使用者發現自己應用程式有個時間段消費的資料處理上存在問題,想重新消費,此時只需要將點位重設到對應的時間點,並且無須重啟自己的應用程式,可以做到應用程式自動感知。
建立訂閱
需要確保自己帳號有許可權對特定project下的topic有訂閱許可權,具體授權參見許可權控制說明文檔。建立步驟如下:
開啟Topic頁面,單機右上方 +訂閱 按鈕,填寫訂閱詳情,點擊 建立
訂閱應用:用來說明當前訂閱的應用程式名稱
描述:當前訂閱的詳情描述

點擊消費點位下方搜尋按鈕,即可查看所有shard消費情況
2. 使用樣本
訂閱功能為使用者提供了點位儲存的能力,與DataHub讀寫功能(參見JavaSDK說明文檔)並無必然聯絡,不過二者可以配合使用,即資料讀取後使用者需要將消費點位進行儲存的情境。
參考代碼
//點位消費樣本,並在消費過程中進行點位的提交
public void offset_consumption(int maxRetry) {
String endpoint = "<YourEndPoint>";
String accessId = "<YourAccessId>";
String accessKey = "<YourAccessKey>";
String projectName = "<YourProjectName>";
String topicName = "<YourTopicName>";
String subId = "<YourSubId>";
String shardId = "0";
List<String> shardIds = Arrays.asList(shardId);
// 建立DataHubClient執行個體
DatahubClient datahubClient = DatahubClientBuilder.newBuilder()
.setDatahubConfig(
new DatahubConfig(endpoint,
// 是否開啟二進位傳輸,服務端2.12版本開始支援
new AliyunAccount(accessId, accessKey), true))
.build();
RecordSchema schema = datahubClient.getTopic(projectName, topicName).getRecordSchema();
OpenSubscriptionSessionResult openSubscriptionSessionResult = datahubClient.openSubscriptionSession(projectName, topicName, subId, shardIds);
SubscriptionOffset subscriptionOffset = openSubscriptionSessionResult.getOffsets().get(shardId);
// 1、擷取當前點位的cursor,如果當前點位已到期則擷取生命週期內第一條record的cursor,未消費同樣擷取生命週期內第一條record的cursor
String cursor = "";
//sequence < 0說明未消費
if (subscriptionOffset.getSequence() < 0) {
// 擷取生命週期內第一條record的cursor
cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.OLDEST).getCursor();
} else {
// 擷取下一條記錄的Cursor
long nextSequence = subscriptionOffset.getSequence() + 1;
try {
//按照SEQUENCE getCursor可能報SeekOutOfRange錯誤,表示當前cursor的資料已到期
cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.SEQUENCE, nextSequence).getCursor();
} catch (SeekOutOfRangeException e) {
// 擷取生命週期內第一條record的cursor
cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.OLDEST).getCursor();
}
}
// 2、讀取並儲存點位,這裡以讀取Tuple資料為例,並且每1000條記錄儲存一次點位
long recordCount = 0L;
// 每次讀取1000條record
int fetchNum = 1000;
int retryNum = 0;
int commitNum = 1000;
while (retryNum < maxRetry) {
try {
GetRecordsResult getRecordsResult = datahubClient.getRecords(projectName, topicName, shardId, schema, cursor, fetchNum);
if (getRecordsResult.getRecordCount() <= 0) {
// 無資料,sleep後讀取
System.out.println("no data, sleep 1 second");
Thread.sleep(1000);
continue;
}
for (RecordEntry recordEntry : getRecordsResult.getRecords()) {
//消費資料
TupleRecordData data = (TupleRecordData) recordEntry.getRecordData();
System.out.println("field1:" + data.getField("field1") + "\t"
+ "field2:" + data.getField("field2"));
// 處理資料完成後,設定點位
recordCount++;
subscriptionOffset.setSequence(recordEntry.getSequence());
subscriptionOffset.setTimestamp(recordEntry.getSystemTime());
// commit offset every 1000 records
if (recordCount % commitNum == 0) {
//提交點位點位
Map<String, SubscriptionOffset> offsetMap = new HashMap<>();
offsetMap.put(shardId, subscriptionOffset);
datahubClient.commitSubscriptionOffset(projectName, topicName, subId, offsetMap);
System.out.println("commit offset successful");
}
}
cursor = getRecordsResult.getNextCursor();
} catch (SubscriptionOfflineException | SubscriptionSessionInvalidException e) {
// 退出. Offline: 訂閱下線; SessionChange: 表示訂閱被其他用戶端同時消費
e.printStackTrace();
throw e;
} catch (SubscriptionOffsetResetException e) {
// 點位被重設,需要重新擷取SubscriptionOffset版本資訊
SubscriptionOffset offset = datahubClient.getSubscriptionOffset(projectName, topicName, subId, shardIds).getOffsets().get(shardId);
subscriptionOffset.setVersionId(offset.getVersionId());
// 點位被重設之後,需要重新擷取點位,擷取點位的方法應該與重設點位時一致,
// 如果重設點位時,同時設定了sequence和timestamp,那麼既可以用SEQUENCE擷取,也可以用SYSTEM_TIME擷取
// 如果重設點位時,只設定了sequence,那麼只能用sequence擷取,
// 如果重設點位時,只設定了timestamp,那麼只能用SYSTEM_TIME擷取點位
// 一般情況下,優先使用SEQUENCE,其次是SYSTEM_TIME,如果都失敗,則採用OLDEST擷取
cursor = null;
if (cursor == null) {
try {
long nextSequence = offset.getSequence() + 1;
cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.SEQUENCE, nextSequence).getCursor();
System.out.println("get cursor successful");
} catch (DatahubClientException exception) {
System.out.println("get cursor by SEQUENCE failed, try to get cursor by SYSTEM_TIME");
}
}
if (cursor == null) {
try {
cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.SYSTEM_TIME, offset.getTimestamp()).getCursor();
System.out.println("get cursor successful");
} catch (DatahubClientException exception) {
System.out.println("get cursor by SYSTEM_TIME failed, try to get cursor by OLDEST");
}
}
if (cursor == null) {
try {
cursor = datahubClient.getCursor(projectName, topicName, shardId, CursorType.OLDEST).getCursor();
System.out.println("get cursor successful");
} catch (DatahubClientException exception) {
System.out.println("get cursor by OLDEST failed");
System.out.println("get cursor failed!!");
throw e;
}
}
} catch (LimitExceededException e) {
// limit exceed, retry
e.printStackTrace();
retryNum++;
} catch (DatahubClientException e) {
// other error, retry
e.printStackTrace();
retryNum++;
} catch (Exception e) {
e.printStackTrace();
System.exit(-1);
}
}
}運行結果
1、第一次啟動時會從最早的資料開始消費,運行過程中可以重新整理webconsole上的訂閱頁面,shard 的消費點位都在往前移動。2、如果在消費過程中,通過webconsole上的重設點位功能來手動調整點位的話,我們的消費程式會自動感知到點位變化從新調整後的點位開始消費,這時用戶端通過捕獲OffsetResetedException異常後調用getSubscriptionOffset介面從服務端擷取到最新的SubscriptionOffset對象,然後繼續從新點位開始消費。3、注意:同一個訂閱下的相同shard不要同時被多個消費線程/進程消費,否則點位會被交替覆蓋,也就是最終服務端儲存的點位是未定義的,這種情況下服務端會拋OffsetSessionChangedException異常,建議用戶端對這類異常進行捕獲後做exit處理,檢查是否存在重複消費的設計。