全部產品
Search
文件中心

Tablestore:建立投遞任務

更新時間:May 12, 2022

使用CreateDeliveryTask介面建立一個投遞任務。通過建立投遞任務,您可以將Tablestore資料表中的資料投遞到OSS Bucket中儲存。

注意

請確認已安裝支援資料湖投遞功能的TablestoreGo SDK。

前提條件

  • 已開通OSS服務且在Tablestore執行個體所在地區建立Bucket。具體操作,請參見開通OSS服務

  • 已通過控制台建立Tablestore服務關聯角色並記錄角色的ARN。具體操作,請參見建立投遞任務

    服務關聯角色的ARN請通過RAM控制台擷取,具體操作如下:

    RAM 角色管理介面,搜尋AliyunServiceRoleForOTSDataDelivery後,單擊角色名稱,在角色詳情介面,可以查看和複製角色的ARN資訊。

    figrole
  • 已初始化TableStoreClient。具體操作,請參見初始化

  • 已建立資料表並寫入資料。

參數

參數

說明

TableName

資料表名稱。

TaskName

投遞任務名稱。

名稱只能包含英文小寫字母(a~z)、數字和虛線(-),開頭和結尾必須為英文小寫字母或數字,且長度為3~16字元。

TaskConfig

投遞任務配置,包括如下選項:

  • OssPrefix:OSS Bucket中的目錄首碼,將Tablestore的資料投遞到該OSS Bucket目錄中。投遞路徑中支援引用$yyyy、$MM、$dd、$HH、$mm五種時間變數。

    • 當投遞路徑中引用時間變數時,可以按資料的寫入時間動態產生OSS目錄,實現hive partition naming style的資料時間分區,從而按照時間分區組織OSS中的檔案分布。

    • 當投遞路徑中不引用時間變數時,所有檔案會被投遞到固定的OSS首碼目錄中。

  • OssBucket:OSS Bucket名稱。

  • OssEndpoint:OSS Bucket所在地區的服務地址。

  • OssRoleName:Tablestore服務關聯角色的ARN資訊。

  • Format:投遞的資料的儲存以Parquet列存格式儲存,資料湖投遞預設使用PLAIN編碼方式,PLAIN編碼方式支援任意類型資料。

  • EventTimeColumn:事件時間列,用於指定按某一列資料的時間進行分區。如果不設定此參數,則按資料寫入Tablestore的時間進行分區。

  • Schema:指定需要投遞的資料列,必須手動設定投遞欄位的源表欄位、目標欄位和目標欄位類型。

    您可以選擇任意欄位以任意順序、名稱寫入列存檔案,OSS的列存資料會按Schema數組中的資料列先後順序分布。

    注意

    投遞資料的欄位類型必須與資料來源的欄位類型匹配,否則會作為髒資料丟棄。欄位類型映射詳情請參見資料格式映射

TaskType

投遞任務的類型,包括如下選項:

  • IncTask:表示增量資料投遞模式,只同步增量資料。

  • BaseTask:表示全量資料投遞模式,一次性全表掃描資料同步。

  • BaseIncTask(預設):表示全量&增量資料投遞模式,全量資料同步完成後,再同步增量資料。

    其中增量資料同步時可以擷取最新投遞時間和瞭解當前投遞狀態。

樣本

func CreateTaskSample(client *tablestore.TableStoreClient) {
 createTask := &tablestore.CreateDeliveryTaskRequest{
  TableName: "sampleTable",
  TaskName: "sampledeliverytask",
  TaskType: tablestore.BaseIncTask,
  TaskConfig: &tablestore.OSSTaskConfig{
   OssPrefix:   "sample/year=$yyyy/month=$MM",
   OssBucket:      "datadeliverytest",
   OssEndpoint:    "oss-cn-hangzhou.aliyuncs.com",
   OssRoleName:    "acs:ram::17************45:role/aliyunserviceroleforotsdatadelivery",
   Schema: []*tablestore.TaskSchema{
    {
     ColumnName: "PK1",
     OssColumnName: "PK1",
     Type: tablestore.ParquetInt64,
    },
    {
     ColumnName: "PK2",
     OssColumnName: "PK2",
     Type: tablestore.ParquetUtf8,
    },
    {
     ColumnName: "Col1",
     OssColumnName: "Col1",
     Type: tablestore.ParquetDouble,
    },


   },
  },
 }
 createResp, err := client.CreateDeliveryTask(createTask)
 if err != nil {
  log.Fatal("create delivery task failed ", err)
 }
 fmt.Println("create delivery task success ", createResp.RequestId)
}