全部產品
Search
文件中心

Tablestore:建立投遞任務

更新時間:May 12, 2022

使用CreateDeliveryTask介面建立一個投遞任務。通過建立投遞任務,您可以將Tablestore資料表中的資料投遞到OSS Bucket中儲存。

注意

請確保已安裝支援資料湖投遞功能的Tablestore Java SDK。關於Tablestore Java SDK版本的更多資訊,請參見Java SDK歷史迭代版本

前提條件

  • 已開通OSS服務且在Tablestore執行個體所在地區建立Bucket。具體操作,請參見開通OSS服務

  • 已通過控制台建立Tablestore服務關聯角色並記錄角色的ARN。具體操作,請參見建立投遞任務

    服務關聯角色的ARN請通過RAM控制台擷取,具體操作如下:

    RAM 角色管理介面,搜尋AliyunServiceRoleForOTSDataDelivery後,單擊角色名稱,在角色詳情介面,可以查看和複製角色的ARN資訊。

  • 已初始化OTSClient。具體操作,請參見初始化

  • 已建立資料表並寫入資料。

參數

參數

說明

tableName

資料表名稱。

taskName

投遞任務名稱。

名稱只能包含英文小寫字母(a~z)、數字和虛線(-),開頭和結尾必須為英文小寫字母或數字,且長度為3~16字元。

taskConfig

投遞任務配置,包括如下選項:

  • ossPrefix:OSS Bucket中的目錄首碼,將Tablestore的資料投遞到該OSS Bucket目錄中。投遞路徑中支援引用$yyyy、$MM、$dd、$HH、$mm五種時間變數。

    • 當投遞路徑中引用時間變數時,可以按資料的寫入時間動態產生OSS目錄,實現hive partition naming style的資料時間分區,從而按照時間分區組織OSS中的檔案分布。

    • 當投遞路徑中不引用時間變數時,所有檔案會被投遞到固定的OSS首碼目錄中。

  • ossBucket:OSS Bucket名稱。

  • ossEndpoint:OSS Bucket所在地區的服務地址。

  • ossStsRole:Tablestore服務關聯角色的ARN資訊。

  • format:投遞的資料的儲存以Parquet列存格式儲存,資料湖投遞預設使用PLAIN編碼方式,PLAIN編碼方式支援任意類型資料。

  • eventTimeColumn:事件時間列,用於指定按某一列資料的時間進行分區。如果不設定此參數,則按資料寫入Tablestore的時間進行分區。

  • parquetSchema:指定需要投遞的資料列,必須手動設定投遞欄位的源表欄位、目標欄位和目標欄位類型。

    您可以選擇任意欄位以任意順序、名稱寫入列存檔案,OSS的列存資料會按Schema數組中的資料列先後順序分布。

    注意

    投遞資料的欄位類型必須與資料來源的欄位類型匹配,否則會作為髒資料丟棄。欄位類型映射詳情請參見資料格式映射

taskType

投遞任務的類型,包括如下選項:

  • INC:表示增量資料投遞模式,只同步增量資料。

  • BASE:表示全量資料投遞模式,一次性全表掃描資料同步。

  • BASE_INC(預設):表示全量&增量資料投遞模式,全量資料同步完成後,再同步增量資料。

    其中增量資料同步時可以擷取最新投遞時間和瞭解當前投遞狀態。

樣本

private static void createDeliveryTask(SyncClient client) {
    String tableName = "sampleTable";
    String taskName = "sampledeliverytask";
    OSSTaskConfig taskConfig = new OSSTaskConfig();
    taskConfig.setOssPrefix("sampledeliverytask/year=$yyyy/month=$MM");
    taskConfig.setOssBucket("datadeliverytest");
    taskConfig.setOssEndpoint("oss-cn-hangzhou.aliyuncs.com");
    taskConfig.setOssStsRole("acs:ram::17************45:role/aliyunserviceroleforotsdatadelivery"); //eventColumn為可選配置,指定按某一列資料的時間進行分區。如果不設定此參數,則按資料寫入Tablestore的時間進行分區。
    EventColumn eventColumn = new EventColumn("PK1", EventTimeFormat.RFC1123);
    taskConfig.setEventTimeColumn(eventColumn);
    taskConfig.addParquetSchema(new ParquetSchema("PK1", "PK1", DataType.UTF8));
    taskConfig.addParquetSchema(new ParquetSchema("PK2", "PK2", DataType.BOOL));
    taskConfig.addParquetSchema(new ParquetSchema("Col1", "Col1", DataType.UTF8));
    CreateDeliveryTaskRequest request = new CreateDeliveryTaskRequest();
    request.setTableName(tableName);
    request.setTaskName(taskName);
    request.setTaskConfig(taskConfig);
    request.setTaskType(DeliveryTaskType.BASE_INC);
    CreateDeliveryTaskResponse response = client.createDeliveryTask(request);
    System.out.println("resquestID: "+ response.getRequestId());
    System.out.println("traceID: " + response.getTraceId());
    System.out.println("create delivery task success");
}