全部產品
Search
文件中心

Tablestore:使用SDK

更新時間:May 13, 2022

使用SDK進行資料投遞前,您需要瞭解使用資料湖投遞功能的注意事項、介面等資訊。建立投遞任務後,Tablestore資料表中的資料會自動投遞到OSS Bucket中儲存。

注意事項

  • 目前支援使用資料湖投遞功能的地區有華東1(杭州)、華東2(上海)、華北2(北京)和華北3(張家口)。

  • 資料湖投遞不支援同步刪除操作,Tablestore中的刪除操作在資料投遞時會被忽略,已投遞到OSS中的資料不會被刪除。

  • 建立資料投遞任務時存在最多1分鐘的初始化時間。

  • 資料同步存在延遲,寫入速率穩定時,延遲在3分鐘內。資料同步的P99延遲在10分鐘內。

    說明

    P99延遲表示過去10秒內最慢的1%的請求的平均延遲。

介面

介面

說明

CreateDeliveryTask

建立一個投遞任務。

ListDeliveryTask

列出一個資料表所有的投遞任務資訊。

DescribeDeliveryTask

查詢投遞任務描述資訊。

DeleteDeliveryTask

刪除一個投遞任務。

使用

您可以使用如下語言的SDK實現資料湖投遞功能。

參數

參數

說明

tableName

資料表名稱。

taskName

投遞任務名稱。

名稱只能包含英文小寫字母(a~z)、數字和虛線(-),開頭和結尾必須為英文小寫字母或數字,且長度為3~16字元。

taskConfig

投遞任務配置,包括如下選項:

  • ossPrefix:OSS Bucket中的目錄首碼,將Tablestore的資料投遞到該OSS Bucket目錄中。投遞路徑中支援引用$yyyy、$MM、$dd、$HH、$mm五種時間變數。

    • 當投遞路徑中引用時間變數時,可以按資料的寫入時間動態產生OSS目錄,實現hive partition naming style的資料時間分區,從而按照時間分區組織OSS中的檔案分布。

    • 當投遞路徑中不引用時間變數時,所有檔案會被投遞到固定的OSS首碼目錄中。

  • ossBucket:OSS Bucket名稱。

  • ossEndpoint:OSS Bucket所在地區的服務地址。

  • ossStsRole:Tablestore服務關聯角色的ARN資訊。

  • format:投遞的資料的儲存以Parquet列存格式儲存,資料湖投遞預設使用PLAIN編碼方式,PLAIN編碼方式支援任意類型資料。

  • eventTimeColumn:事件時間列,用於指定按某一列資料的時間進行分區。如果不設定此參數,則按資料寫入Tablestore的時間進行分區。

  • parquetSchema:指定需要投遞的資料列,必須手動設定投遞欄位的源表欄位、目標欄位和目標欄位類型。

    您可以選擇任意欄位以任意順序、名稱寫入列存檔案,OSS的列存資料會按Schema數組中的資料列先後順序分布。

    注意

    投遞資料的欄位類型必須與資料來源的欄位類型匹配,否則會作為髒資料丟棄。欄位類型映射詳情請參見資料格式映射

taskType

投遞任務的類型,包括如下選項:

  • INC:表示增量資料投遞模式,只同步增量資料。

  • BASE:表示全量資料投遞模式,一次性全表掃描資料同步。

  • BASE_INC(預設):表示全量&增量資料投遞模式,全量資料同步完成後,再同步增量資料。

    其中增量資料同步時可以擷取最新投遞時間和瞭解當前投遞狀態。

樣本

import com.alicloud.openservices.tablestore.ClientException;
import com.alicloud.openservices.tablestore.SyncClient;
import com.alicloud.openservices.tablestore.TableStoreException;
import com.alicloud.openservices.tablestore.model.delivery.*;
public class DeliveryTask {

        public static void main(String[] args) {
            final String endPoint = "https://yourinstancename.cn-hangzhou.ots.aliyuncs.com";

            final String accessKeyId = "LT********************g5";

            final String accessKeySecret = "Er**************************Yc";

            final String instanceName = "yourinstancename";

            SyncClient client = new SyncClient(endPoint, accessKeyId, accessKeySecret, instanceName);
            try {
                createDeliveryTask(client);
                System.out.println("end");
            } catch (TableStoreException e) {
                System.err.println("操作失敗,詳情:" + e.getMessage() + e.getErrorCode() + e.toString());
                System.err.println("Request ID:" + e.getRequestId());
            } catch (ClientException e) {
                System.err.println("請求失敗,詳情:" + e.getMessage());
            } finally {
                client.shutdown();
            }
        }

        private static void createDeliveryTask(SyncClient client){
            String tableName = "sampleTable";
            String taskName = "sampledeliverytask";
            OSSTaskConfig taskConfig = new OSSTaskConfig();
            taskConfig.setOssPrefix("sampledeliverytask/year=$yyyy/month=$MM");
            taskConfig.setOssBucket("datadeliverytest");
            taskConfig.setOssEndpoint("oss-cn-hangzhou.aliyuncs.com");
            taskConfig.setOssStsRole("acs:ram::17************45:role/aliyunserviceroleforotsdatadelivery");
            //eventColumn為可選配置,指定按某一列資料的時間進行分區。如果不設定此參數,則按資料寫入Tablestore的時間進行分區。
            EventColumn eventColumn = new EventColumn("Col1", EventTimeFormat.RFC1123);
            taskConfig.setEventTimeColumn(eventColumn);
            taskConfig.addParquetSchema(new ParquetSchema("PK1", "PK1", DataType.UTF8));
            taskConfig.addParquetSchema(new ParquetSchema("PK2", "PK2", DataType.BOOL));
            taskConfig.addParquetSchema(new ParquetSchema("Col1", "Col1", DataType.UTF8));
            CreateDeliveryTaskRequest request = new CreateDeliveryTaskRequest();
            request.setTableName(tableName);
            request.setTaskName(taskName);
            request.setTaskConfig(taskConfig);
            request.setTaskType(DeliveryTaskType.BASE_INC);
            CreateDeliveryTaskResponse response = client.createDeliveryTask(request);
            System.out.println("resquestID: "+ response.getRequestId());
            System.out.println("traceID: " + response.getTraceId());
            System.out.println("create delivery task success");
        }
}