資料準備
在Table Store中準備一張資料表 pet,其中name
是唯一的一列主鍵。資料樣本如下:
name | owner | species | sex | birth | death |
---|---|---|---|---|---|
Fluffy | Harold | cat | f | 1993-02-04 | |
Claws | Gwen | cat | m | 1994-03-17 | |
Buffy | Harold | dog | f | 1989-05-13 | |
Fang | Benny | dog | m | 1990-08-27 | |
Bowser | Diane | dog | m | 1979-08-31 | 1995-07-29 |
Chirpy | Gwen | bird | f | 1998-09-11 | |
Whistler | Gwen | bird | 1997-12-09 | ||
Slim | Benny | snake | m | 1996-04-29 | |
Puffball | Diane | hamster | f | 1999-03-30 |
说明 表格中空白的部分不需要寫入,因為Table Store是一個 schema-free 的儲存結構(資料模型),沒有值也不需要寫入
NULL
。
Spark SQL 訪問樣本
前提條件
按照準備工作中的步驟準備好 Spark、JDK 環境以及Table Store Java SDK 和 EMR SDK 的依賴包。
樣本
$ bin/spark-sql --master local --jars tablestore-4.3.1-jar-with-dependencies.jar,emr-tablestore-1.4.2.jar
spark-sql> CREATE EXTERNAL TABLE pet
(name STRING, owner STRING, species STRING, sex STRING, birth STRING, death STRING)
STORED BY 'com.aliyun.openservices.tablestore.hive.TableStoreStorageHandler'
WITH SERDEPROPERTIES(
"tablestore.columns.mapping"="name,owner,species,sex,birth,death")
TBLPROPERTIES (
"tablestore.endpoint"="YourEndpoint",
"tablestore.access_key_id"="YourAccessKeyId",
"tablestore.access_key_secret"="YourAccessKeySecret",
"tablestore.table.name"="pet");
spark-sql> SELECT * FROM pet;
Bowser Diane dog m 1979-08-31 1995-07-29
Buffy Harold dog f 1989-05-13 NULL
Chirpy Gwen bird f 1998-09-11 NULL
Claws Gwen cat m 1994-03-17 NULL
Fang Benny dog m 1990-08-27 NULL
Fluffy Harold cat f 1993-02-04 NULL
Puffball Diane hamster f 1999-03-30 NULL
Slim Benny snake m 1996-04-29 NULL
Whistler Gwen bird NULL 1997-12-09 NULL
Time taken: 5.045 seconds, Fetched 9 row(s)
spark-sql> SELECT * FROM pet WHERE birth > "1995-01-01";
Chirpy Gwen bird f 1998-09-11 NULL
Puffball Diane hamster f 1999-03-30 NULL
Slim Benny snake m 1996-04-29 NULL
Whistler Gwen bird NULL 1997-12-09 NULL
Time taken: 1.41 seconds, Fetched 4 row(s)
參數說明如下:
- WITH SERDEPROPERTIES
- tablestore.columns.mapping(可選):在預設情況下,外表的欄位名即為Table Store上表的列名(主鍵列名或屬性列名)。但有時外表的欄位名和表上列名並不一致(比如處理大小寫或字元集相關的問題),這時候就需要指定 tablestore.columns.mapping。該參數為一個英文逗號分隔的字串,每個分隔之間不能添加空格,每一項都是表上列名,順序與外表欄位一致。
说明 Table Store的列名支援空白字元,所以空白也會被認為是表上列名的一部分。
- tablestore.columns.mapping(可選):在預設情況下,外表的欄位名即為Table Store上表的列名(主鍵列名或屬性列名)。但有時外表的欄位名和表上列名並不一致(比如處理大小寫或字元集相關的問題),這時候就需要指定 tablestore.columns.mapping。該參數為一個英文逗號分隔的字串,每個分隔之間不能添加空格,每一項都是表上列名,順序與外表欄位一致。
- TBLPROPERTIES
-
tablestore.endpoint(必填):訪問Table Store的服務地址,也可以在Table Store控制台上查看這個執行個體的 endpoint 資訊。
-
tablestore.instance(可選):Table Store的執行個體名。若不填,則為 tablestore.endpoint 的第一段。
-
tablestore.table.name(必填):Table Store上對應的表名。
-
tablestore.access_key_id、tablestore.access_key_secret(必填) ,請參見存取控制。
-
tablestore.sts_token(可選),請參見授權管理。
-
Spark 訪問樣本
以下樣本介紹如何使用 Spark 程式統計資料表 pet 的行數。
private static RangeRowQueryCriteria fetchCriteria() {
RangeRowQueryCriteria res = new RangeRowQueryCriteria("YourTableName");
res.setMaxVersions(1);
List<PrimaryKeyColumn> lower = new ArrayList<PrimaryKeyColumn>();
List<PrimaryKeyColumn> upper = new ArrayList<PrimaryKeyColumn>();
lower.add(new PrimaryKeyColumn("YourPkeyName", PrimaryKeyValue.INF_MIN));
upper.add(new PrimaryKeyColumn("YourPkeyName", PrimaryKeyValue.INF_MAX));
res.setInclusiveStartPrimaryKey(new PrimaryKey(lower));
res.setExclusiveEndPrimaryKey(new PrimaryKey(upper));
return res;
}
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf().setAppName("RowCounter");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
Configuration hadoopConf = new Configuration();
TableStoreInputFormat.setCredential(
hadoopConf,
new Credential("YourAccessKeyId", "YourAccessKeySecret"));
TableStoreInputFormat.setEndpoint(
hadoopConf,
new Endpoint("https://YourInstance.Region.ots.aliyuncs.com/"));
TableStoreInputFormat.addCriteria(hadoopConf, fetchCriteria());
try {
JavaPairRDD<PrimaryKeyWritable, RowWritable> rdd = sc.newAPIHadoopRDD(
hadoopConf,
TableStoreInputFormat.class,
PrimaryKeyWritable.class,
RowWritable.class);
System.out.println(
new Formatter().format("TOTAL: %d", rdd.count()).toString());
} finally {
sc.close();
}
}
说明 如果使用 scala,只需把 JavaSparkContext 換成 SparkContext,JavaPairRDD 換成 PairRDD 即可。或者更簡單,交給編譯器自行做類型推斷
運行程式
$ bin/spark-submit --master local --jars hadoop-connector.jar row-counter.jar
TOTAL: 9
類型轉換說明
Table Store支援的資料類型和 Hive/Spark 支援的資料類型不完全相同。
下表列出了從Table Store的資料類型(行)轉換到 Hive/Spark 資料類型(列)時所支援的情況。
TINYINT | SMALLINT | INT | BIGINT | FLOAT | DOUBLE | BOOLEAN | STRING | BINARY | |
---|---|---|---|---|---|---|---|---|---|
INTEGER | 可,損失精度 | 可,損失精度 | 可,損失精度 | 可 | 可,損失精度 | 可,損失精度 | |||
DOUBLE | 可,損失精度 | 可,損失精度 | 可,損失精度 | 可,損失精度 | 可,損失精度 | 可 | |||
BOOLEAN | 可 | ||||||||
STRING | 可 | ||||||||
BINARY | 可 |