資料準備

在Table Store中準備一張資料表 pet,其中name是唯一的一列主鍵。資料樣本如下:

name owner species sex birth death
Fluffy Harold cat f 1993-02-04
Claws Gwen cat m 1994-03-17
Buffy Harold dog f 1989-05-13
Fang Benny dog m 1990-08-27
Bowser Diane dog m 1979-08-31 1995-07-29
Chirpy Gwen bird f 1998-09-11
Whistler Gwen bird 1997-12-09
Slim Benny snake m 1996-04-29
Puffball Diane hamster f 1999-03-30
说明 表格中空白的部分不需要寫入,因為Table Store是一個 schema-free 的儲存結構(資料模型),沒有值也不需要寫入 NULL

Spark SQL 訪問樣本

前提條件

按照準備工作中的步驟準備好 Spark、JDK 環境以及Table Store Java SDK 和 EMR SDK 的依賴包。

樣本

$ bin/spark-sql --master local --jars tablestore-4.3.1-jar-with-dependencies.jar,emr-tablestore-1.4.2.jar
spark-sql> CREATE EXTERNAL TABLE pet
  (name STRING, owner STRING, species STRING, sex STRING, birth STRING, death STRING)
  STORED BY 'com.aliyun.openservices.tablestore.hive.TableStoreStorageHandler'
  WITH SERDEPROPERTIES(
    "tablestore.columns.mapping"="name,owner,species,sex,birth,death")
  TBLPROPERTIES (
    "tablestore.endpoint"="YourEndpoint",
    "tablestore.access_key_id"="YourAccessKeyId",
    "tablestore.access_key_secret"="YourAccessKeySecret",
    "tablestore.table.name"="pet");
spark-sql> SELECT * FROM pet;
Bowser  Diane   dog     m       1979-08-31      1995-07-29
Buffy   Harold  dog     f       1989-05-13      NULL
Chirpy  Gwen    bird    f       1998-09-11      NULL
Claws   Gwen    cat     m       1994-03-17      NULL
Fang    Benny   dog     m       1990-08-27      NULL
Fluffy  Harold  cat     f       1993-02-04      NULL
Puffball        Diane   hamster f       1999-03-30      NULL
Slim    Benny   snake   m       1996-04-29      NULL
Whistler        Gwen    bird    NULL    1997-12-09      NULL
Time taken: 5.045 seconds, Fetched 9 row(s)
spark-sql> SELECT * FROM pet WHERE birth > "1995-01-01";
Chirpy  Gwen    bird    f       1998-09-11      NULL
Puffball        Diane   hamster f       1999-03-30      NULL
Slim    Benny   snake   m       1996-04-29      NULL
Whistler        Gwen    bird    NULL    1997-12-09      NULL
Time taken: 1.41 seconds, Fetched 4 row(s)

參數說明如下:

  • WITH SERDEPROPERTIES
    • tablestore.columns.mapping(可選):在預設情況下,外表的欄位名即為Table Store上表的列名(主鍵列名或屬性列名)。但有時外表的欄位名和表上列名並不一致(比如處理大小寫或字元集相關的問題),這時候就需要指定 tablestore.columns.mapping。該參數為一個英文逗號分隔的字串,每個分隔之間不能添加空格,每一項都是表上列名,順序與外表欄位一致。
      说明 Table Store的列名支援空白字元,所以空白也會被認為是表上列名的一部分。
  • TBLPROPERTIES
    • tablestore.endpoint(必填):訪問Table Store的服務地址,也可以在Table Store控制台上查看這個執行個體的 endpoint 資訊。

    • tablestore.instance(可選):Table Store的執行個體名。若不填,則為 tablestore.endpoint 的第一段。

    • tablestore.table.name(必填):Table Store上對應的表名。

    • tablestore.access_key_id、tablestore.access_key_secret(必填) ,請參見存取控制

    • tablestore.sts_token(可選),請參見授權管理

Spark 訪問樣本

以下樣本介紹如何使用 Spark 程式統計資料表 pet 的行數。

private static RangeRowQueryCriteria fetchCriteria() {
    RangeRowQueryCriteria res = new RangeRowQueryCriteria("YourTableName");
    res.setMaxVersions(1);
    List<PrimaryKeyColumn> lower = new ArrayList<PrimaryKeyColumn>();
    List<PrimaryKeyColumn> upper = new ArrayList<PrimaryKeyColumn>();
    lower.add(new PrimaryKeyColumn("YourPkeyName", PrimaryKeyValue.INF_MIN));
    upper.add(new PrimaryKeyColumn("YourPkeyName", PrimaryKeyValue.INF_MAX));
    res.setInclusiveStartPrimaryKey(new PrimaryKey(lower));
    res.setExclusiveEndPrimaryKey(new PrimaryKey(upper));
    return res;
}

public static void main(String[] args) {
    SparkConf sparkConf = new SparkConf().setAppName("RowCounter");
    JavaSparkContext sc = new JavaSparkContext(sparkConf);

    Configuration hadoopConf = new Configuration();
    TableStoreInputFormat.setCredential(
        hadoopConf,
        new Credential("YourAccessKeyId", "YourAccessKeySecret"));
    TableStoreInputFormat.setEndpoint(
        hadoopConf,
        new Endpoint("https://YourInstance.Region.ots.aliyuncs.com/"));
    TableStoreInputFormat.addCriteria(hadoopConf, fetchCriteria());

    try {
        JavaPairRDD<PrimaryKeyWritable, RowWritable> rdd = sc.newAPIHadoopRDD(
            hadoopConf,
            TableStoreInputFormat.class,
            PrimaryKeyWritable.class,
            RowWritable.class);
        System.out.println(
            new Formatter().format("TOTAL: %d", rdd.count()).toString());
    } finally {
        sc.close();
    }
}
说明 如果使用 scala,只需把 JavaSparkContext 換成 SparkContext,JavaPairRDD 換成 PairRDD 即可。或者更簡單,交給編譯器自行做類型推斷

運行程式

$ bin/spark-submit --master local --jars hadoop-connector.jar row-counter.jar
TOTAL: 9

類型轉換說明

Table Store支援的資料類型和 Hive/Spark 支援的資料類型不完全相同。

下表列出了從Table Store的資料類型(行)轉換到 Hive/Spark 資料類型(列)時所支援的情況。

TINYINT SMALLINT INT BIGINT FLOAT DOUBLE BOOLEAN STRING BINARY
INTEGER 可,損失精度 可,損失精度  可,損失精度 可,損失精度 可,損失精度
DOUBLE 可,損失精度 可,損失精度 可,損失精度 可,損失精度 可,損失精度
BOOLEAN
STRING
BINARY