This topic describes how to use Spark to consume data in Tablestore.
Use Spark to access Tablestore
Create a table named pet in the Tablestore console and set the name field as the primary key.
name | owner | species | sex | birth | death |
Fluffy | Harold | cat | f | 1993-02-04 | - |
Claws | Gwen | cat | m | 1994-03-17 | - |
Buffy | Harold | dog | f | 1989-05-13 | - |
Fang | Benny | dog | m | 1990-08-27 | - |
Bowser | Diane | dog | m | 1979-08-31 | 1995-07-29 |
Chirpy | Gwen | bird | f | 1998-09-11 | - |
Whistler | Gwen | bird | - | 1997-12-09 | - |
Slim | Benny | snake | m | 1996-04-29 | - |
Puffball | Diane | hamster | f | 1999-03-30 | - |
The following example shows how to consume data in the pet table:
private static RangeRowQueryCriteria fetchCriteria() {
RangeRowQueryCriteria res = new RangeRowQueryCriteria("pet");
res.setMaxVersions(1);
List<PrimaryKeyColumn> lower = new ArrayList<PrimaryKeyColumn>();
List<PrimaryKeyColumn> upper = new ArrayList<PrimaryKeyColumn>();
lower.add(new PrimaryKeyColumn("name", PrimaryKeyValue.INF_MIN));
upper.add(new PrimaryKeyColumn("name", PrimaryKeyValue.INF_MAX));
res.setInclusiveStartPrimaryKey(new PrimaryKey(lower));
res.setExclusiveEndPrimaryKey(new PrimaryKey(upper));
return res;
}
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf().setAppName("RowCounter");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
Configuration hadoopConf = new Configuration();
JavaSparkContext sc = null;
try {
sc = new JavaSparkContext(sparkConf);
Configuration hadoopConf = new Configuration();
TableStore.setCredential(
hadoopConf,
new Credential(accessKeyId, accessKeySecret, securityToken));
Endpoint ep = new Endpoint(endpoint, instance);
TableStore.setEndpoint(hadoopConf, ep);
TableStoreInputFormat.addCriteria(hadoopConf, fetchCriteria());
JavaPairRDD<PrimaryKeyWritable, RowWritable> rdd = sc.newAPIHadoopRDD(
hadoopConf, TableStoreInputFormat.class,
PrimaryKeyWritable.class, RowWritable.class);
System.out.println(
new Formatter().format("TOTAL: %d", rdd.count()).toString());
} finally {
if (sc != null) {
sc.close();
}
}
}
Use Spark SQL statements to access Tablestore
Sample SQL statement
spark-sql --jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/* \
--hiveconf accessKeyId=$ALIBABA_CLOUD_ACCESS_KEY_ID \
--hiveconf accessKeySecret=$ALIBABA_CLOUD_ACCESS_KEY_SECRET
You must configure environment variables before you can run the sample code. For more information about how to configure environment variables, see Configure environment variables.
You can obtain the type of the data source that is used to access Tablestore from
/opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/*
. If your E-MapReduce (EMR) cluster uses Spark 2, you must changespark3
in the preceding statement tospark2
.If the error
java.lang.ClassNotFoundException: org.apache.commons.net.util.Base64
is reported when you use Spark 3 to access Tablestore, you need to add the commons-net dependency. You can change the value of--jars
to/opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/*,/opt/apps/HADOOP-COMMON/hadoop-common-current/share/hadoop/common/lib/commons-net-3.6.jar
.If you do not want to add the
--jars
parameter each time you execute a statement, you can add/opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/*:/opt/apps/HADOOP-COMMON/hadoop-common-current/share/hadoop/common/lib/commons-net-3.6.jar
to the values of the spark.driver.extraClassPath and spark.executor.extraClassPath configuration items on the Configure tab of the Spark service page.
The following example shows how to create a table and read data from the table.
create table test_tableStore
using tablestore
options(endpoint = 'https://test.cn-hangzhou.vpc.tablestore.aliyuncs.com',
access.key.id = '${hiveconf:accessKeyId}',
access.key.secret = '${hiveconf:accessKeySecret}',
table.name = 'test_table',
instance.name = 'test_instance',
catalog = '{"columns":{"pk":{"col":"pk","type":"string"},"data":{"col":"data","type":"string"}}}'
);
select * from test_tableStore
References
For the complete sample code, visit GitHub.
For more information, see Batch computing.