本文為您展示DataHub的 Java SDK的Topic操作。
Topic說明
Topic是 DataHub 訂閱和發布的最小單位,使用者可以用Topic來表示一類或者一種流資料,目前支援Tuple與Blob兩種類型:
Blob類型Topic支援寫入一塊位元據作為一個Record。
Tuple類型的Topic支援類似於資料庫的記錄的資料,每條記錄包含多個列,需要指定Record Schema,因為網路傳輸中,資料都是以字串的形式發送,需要schema來轉換成對應的類型。
僅支援以下資料類型:
類型
含義
範圍
BIGINT
8位元組有符號整型
-9223372036854775807 ~ 9223372036854775807DOUBLE
8位元組雙精確度浮點數
-1.0 _10^308 ~ 1.0 _10^308BOOLEAN
布爾類型
可取以下任意一組:
True/False
true/false
0/1
TIMESTAMP
時間戳記類型
表示到微秒的時間戳記。
STRING
字串,只支援UTF-8編碼
單個STRING列最長允許2MB。
TINYINT
單位元組整型
-128 ~127SMALLINT
雙位元組整型
-32768 ~ 32767INTEGER
4位元組整型
-2147483648 ~ 2147483647FLOAT
4位元組單精確度浮點數
-3.40292347_10^38 ~ 3.40292347_10^38說明DataHub 中的
TINYINT、SMALLINT、INTEGER、FLOAT類型從java sdk 2.16.1-public開始支援。
建立Topic
建立Tuple Topic
建立Blob Topic
刪除Topic
刪除Topic之前需保證Topic中沒有subscription和connector,否則會異常:NoPermissionException。
參數說明
參數名 | 參數類型 | 參數說明 |
projectName | String | 專案名稱。 |
topicName | String | Topic名稱。 |
異常描述
異常類名 | 錯誤碼 | 異常說明 |
DatahubClientException | - | 並且是所有異常的基類 |
NoPermissionException |
| 沒有許可權,通常是RAM配置不正確,或沒有正確授權子帳號。 |
程式碼範例
public static void deleteTopic(String projectName, String topicName) {
try {
datahubClient.deleteTopic(projectName, topicName);
System.out.println("delete topic successful");
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}列出Topic
以列表的形式列出設定項目下的所有Topic。
參數說明
參數名 | 參數類型 | 參數說明 |
projectName | String | 專案名稱。 |
程式碼範例
public static void listTopic(String projectName ) {
try {
ListTopicResult listTopicResult = datahubClient.listTopic(projectName);
if (listTopicResult.getTopicNames().size() > 0) {
for (String tName : listTopicResult.getTopicNames()) {
System.out.println(tName);
}
}
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}更新Topic
更新Topic資訊,可更新Topic的描述以及Topic的生命週期。
參數說明
參數名 | 參數類型 | 參數說明 |
projectName | String | 專案名稱。 |
topicName | String | Topic名稱。 |
comment | int | Topic描述 |
lifeCycle | String | Topic生命週期 |
異常說明
異常類名 | 錯誤碼 | 異常說明 |
DatahubClientException | - | 並且是所有異常的基類 |
程式碼範例
public static void updateTopic(String projectName, String topicName, int lifeCycle, String comment) {
try {
comment = "new topic comment";
lifeCycle = 1;
datahubClient.updateTopic(projectName, Constant.topicName,lifeCycle, comment);
System.out.println("update topic successful");
//查看更新後結果
GetTopicResult getTopicResult = datahubClient.getTopic(projectName, topicName);
System.out.println(getTopicResult.getComment());
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}查詢Topic
根據專案名稱和Topic名稱來查詢Topic的相關屬性。
參數說明
參數名 | 參數類型 | 參數說明 |
projectName | String | 專案名稱。 |
topicName | String | Topic名稱。 |
異常說明
異常類名 | 錯誤碼 | 異常說明 |
DatahubClientException | - | 並且是所有異常的基類 |
程式碼範例
public static void getTopic(String projectName, String topicName) {
try {
GetTopicResult getTopicResult = datahubClient.getTopic(projectName, topicName);
System.out.println(getTopicResult.getShardCount() + "\t"
+ getTopicResult.getLifeCycle() + "\t"
+ getTopicResult.getRecordType() + "\t"
+ getTopicResult.getComment());
} catch (DatahubClientException e) {
System.out.println(e.getErrorMessage());
}
}