全部產品
Search
文件中心

DataHub:Topic操作

更新時間:Jul 17, 2025

本文為您展示DataHub的 Java SDK的Topic操作。

Topic說明

Topic是 DataHub 訂閱和發布的最小單位,使用者可以用Topic來表示一類或者一種流資料,目前支援Tuple與Blob兩種類型:

  • Blob類型Topic支援寫入一塊位元據作為一個Record。

  • Tuple類型的Topic支援類似於資料庫的記錄的資料,每條記錄包含多個列,需要指定Record Schema,因為網路傳輸中,資料都是以字串的形式發送,需要schema來轉換成對應的類型。

    僅支援以下資料類型:

    類型

    含義

    範圍

    BIGINT

    8位元組有符號整型

    -9223372036854775807 ~ 9223372036854775807

    DOUBLE

    8位元組雙精確度浮點數

    -1.0 _10^308 ~ 1.0 _10^308

    BOOLEAN

    布爾類型

    可取以下任意一組:

    • True/False

    • true/false

    • 0/1

    TIMESTAMP

    時間戳記類型

    表示到微秒的時間戳記。

    STRING

    字串,只支援UTF-8編碼

    單個STRING列最長允許2MB。

    TINYINT

    單位元組整型

    -128 ~127

    SMALLINT

    雙位元組整型

    -32768 ~ 32767

    INTEGER

    4位元組整型

    -2147483648 ~ 2147483647

    FLOAT

    4位元組單精確度浮點數

    -3.40292347_10^38 ~ 3.40292347_10^38

    說明

    DataHub 中的 TINYINTSMALLINTINTEGERFLOAT類型從java sdk 2.16.1-public開始支援。

建立Topic

建立Tuple Topic

參數說明

參數名

參數類型

參數說明

projectName

String

專案名稱。

topicName

String

Topic名稱。

shardCount

int

初始shard數量。

lifeCycle

int

資料生命週期(單位:天)。

recordType

RecordType

寫入的Record類型,現僅支援 TUPLE和BLOB。

recordSchema

RecordSchema

Topic的records schema。

comment

String

Topic的描述。

異常說明

異常類名

錯誤碼

異常說明

DatahubClientException

-

並且是所有異常的基類

程式碼範例

 public static void createTupleTopic(String projectName, String topicName, int shardCount, int lifeCycle,  String topicComment) {
   RecordSchema schema = new RecordSchema();
   schema.addField(new Field("bigint_field", FieldType.BIGINT));
   schema.addField(new Field("double_field", FieldType.DOUBLE));
   schema.addField(new Field("boolean_field", FieldType.BOOLEAN));
   schema.addField(new Field("timestamp_field", FieldType.TIMESTAMP));
   schema.addField(new Field("tinyint_field", FieldType.TINYINT));
   schema.addField(new Field("smallint_field", FieldType.SMALLINT));
   schema.addField(new Field("integer_field", FieldType.INTEGER));
   schema.addField(new Field("floar_field", FieldType.FLOAT));
   schema.addField(new Field("decimal_field", FieldType.DECIMAL));
   schema.addField(new Field("string_field", FieldType.STRING));
   try {
       datahubClient.createTopic(projectName,topicName, shardCount, lifeCycle, RecordType.TUPLE, schema, topicComment);
       System.out.println("create topic successful");
   } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());
   }
 }

建立Blob Topic

參數說明

參數名

參數類型

參數說明

projectName

String

專案名稱。

topicName

String

Topic名稱

shardCount

int

初始shard數量

lifeCycle

int

資料生命週期(單位:天)。

recordType

RecordType

寫入的Record類型,現僅支援 TUPLE和BLOB

comment

String

Topic的說明。

異常說明

異常類名

錯誤碼

異常說明

DatahubClientException

-

並且是所有異常的基類

InvalidParameterException

InvalidParameter

InvalidCursor

非法參數。

AuthorizationFailureException

Unauthorized

Authorization 簽名解析異常,檢查AK是否填寫正確。

ResourceNotFoundException

ResourceNotFound

NoSuchProject

NoSuchTopic

NoSuchShard

NoSuchSubscription

NoSuchConnector

NoSuchMeteringInfo

訪問的資源不存在。

說明

進行Split/Merge操作後,立即發送其他請求,有可能會拋出該異常

ResourceAlreadyExistException

ResourceAlreadyExist

ProjectAlreadyExist

TopicAlreadyExist

ConnectorAlreadyExist

資源已存在。(建立時如果資源已存在,就會拋出這個異常)。

程式碼範例

public static void createBlobTopic(String projectName, String topicName, int shardCount, int lifeCycle,  String topicComment) {
  try {
      datahubClient.createTopic(projectName, blobTopicName, shardCount, lifeCycle, RecordType.BLOB, topicComment);
      System.out.println("create topic successful");
  } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());
  }

刪除Topic

刪除Topic之前需保證Topic中沒有subscriptionconnector,否則會異常:NoPermissionException

參數說明

參數名

參數類型

參數說明

projectName

String

專案名稱。

topicName

String

Topic名稱。

異常描述

異常類名

錯誤碼

異常說明

DatahubClientException

-

並且是所有異常的基類

NoPermissionException

NoPermission

OperationDenied

沒有許可權,通常是RAM配置不正確,或沒有正確授權子帳號。

程式碼範例

public static void deleteTopic(String projectName, String topicName) {
  try {
      datahubClient.deleteTopic(projectName, topicName);
      System.out.println("delete topic successful");
  } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());

  }
}

列出Topic

以列表的形式列出設定項目下的所有Topic。

參數說明

參數名

參數類型

參數說明

projectName

String

專案名稱。

程式碼範例

   public static void listTopic(String projectName ) {
      try {
          ListTopicResult listTopicResult = datahubClient.listTopic(projectName);
          if (listTopicResult.getTopicNames().size() > 0) {
              for (String tName : listTopicResult.getTopicNames()) {
                  System.out.println(tName);
              }
          }
      } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());          
      }
  }

更新Topic

更新Topic資訊,可更新Topic的描述以及Topic的生命週期

參數說明

參數名

參數類型

參數說明

projectName

String

專案名稱。

topicName

String

Topic名稱。

comment

int

Topic描述

lifeCycle

String

Topic生命週期

異常說明

異常類名

錯誤碼

異常說明

DatahubClientException

-

並且是所有異常的基類

程式碼範例

   public static void updateTopic(String projectName, String topicName, int lifeCycle, String comment) {
        try {
            comment = "new topic comment";
             lifeCycle = 1;
            datahubClient.updateTopic(projectName, Constant.topicName,lifeCycle, comment);
            System.out.println("update topic successful");
            //查看更新後結果
            GetTopicResult getTopicResult = datahubClient.getTopic(projectName, topicName);
            System.out.println(getTopicResult.getComment());
        } catch (DatahubClientException e) {
            System.out.println(e.getErrorMessage());
        }
    }

查詢Topic

根據專案名稱和Topic名稱來查詢Topic的相關屬性。

參數說明

參數名

參數類型

參數說明

projectName

String

專案名稱。

topicName

String

Topic名稱。

異常說明

異常類名

錯誤碼

異常說明

DatahubClientException

-

並且是所有異常的基類

程式碼範例

   public static void getTopic(String projectName, String topicName) {
        try {
            GetTopicResult getTopicResult = datahubClient.getTopic(projectName, topicName);
            System.out.println(getTopicResult.getShardCount() + "\t"
                    + getTopicResult.getLifeCycle() + "\t"
                    + getTopicResult.getRecordType() + "\t"
                    + getTopicResult.getComment());
        } catch (DatahubClientException e) {
            System.out.println(e.getErrorMessage());

        }
    }

更多操作

Tuple Topic 新增 Field

在對Tuple Topic新增Field時,可新增一列,也可一次性插入多列。

參數說明

參數名

參數類型

參數說明

projectName

String

專案名稱。

topicName

String

Topic名稱。

fields

Field

新增列,不允許為空白

異常說明

異常類名

錯誤碼

異常說明

DatahubClientException

-

並且是所有異常的基類

程式碼範例

public static void appendNewField(String projectName,String topicName) {
    try {
        Field newField = new Field("newField", FieldType.STRING, true,"comment");
        datahubClient.appendField(projectName, topicName, newField);
        System.out.println("append field successful");
    } catch (DatahubClientException e) {
        System.out.println(e.getErrorMessage());

    }
}