All Products
Search
Document Center

E-MapReduce:Use Spark to access Tablestore

Last Updated:Aug 10, 2023

This topic describes how to use Spark to consume data in Tablestore.

Use Spark to access Tablestore

Create a table named pet in the Tablestore console and set the name field as the primary key.

name

owner

species

sex

birth

death

Fluffy

Harold

cat

f

1993-02-04

-

Claws

Gwen

cat

m

1994-03-17

-

Buffy

Harold

dog

f

1989-05-13

-

Fang

Benny

dog

m

1990-08-27

-

Bowser

Diane

dog

m

1979-08-31

1995-07-29

Chirpy

Gwen

bird

f

1998-09-11

-

Whistler

Gwen

bird

-

1997-12-09

-

Slim

Benny

snake

m

1996-04-29

-

Puffball

Diane

hamster

f

1999-03-30

-

The following example shows how to consume data in the pet table:

private static RangeRowQueryCriteria fetchCriteria() {
    RangeRowQueryCriteria res = new RangeRowQueryCriteria("pet");
    res.setMaxVersions(1);
    List<PrimaryKeyColumn> lower = new ArrayList<PrimaryKeyColumn>();
    List<PrimaryKeyColumn> upper = new ArrayList<PrimaryKeyColumn>();
    lower.add(new PrimaryKeyColumn("name", PrimaryKeyValue.INF_MIN));
    upper.add(new PrimaryKeyColumn("name", PrimaryKeyValue.INF_MAX));
    res.setInclusiveStartPrimaryKey(new PrimaryKey(lower));
    res.setExclusiveEndPrimaryKey(new PrimaryKey(upper));
    return res;
}
public static void main(String[] args) {
    SparkConf sparkConf = new SparkConf().setAppName("RowCounter");
    JavaSparkContext sc = new JavaSparkContext(sparkConf);
    Configuration hadoopConf = new Configuration();
    JavaSparkContext sc = null;
    try {
        sc = new JavaSparkContext(sparkConf);
        Configuration hadoopConf = new Configuration();
        TableStore.setCredential(
                hadoopConf,
                new Credential(accessKeyId, accessKeySecret, securityToken));
        Endpoint ep = new Endpoint(endpoint, instance);
        TableStore.setEndpoint(hadoopConf, ep);
        TableStoreInputFormat.addCriteria(hadoopConf, fetchCriteria());
        JavaPairRDD<PrimaryKeyWritable, RowWritable> rdd = sc.newAPIHadoopRDD(
                hadoopConf, TableStoreInputFormat.class,
                PrimaryKeyWritable.class, RowWritable.class);
        System.out.println(
            new Formatter().format("TOTAL: %d", rdd.count()).toString());
    } finally {
        if (sc != null) {
            sc.close();
        }
    }
}

Use Spark SQL statements to access Tablestore

Sample SQL statement

spark-sql --jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/* \
  --hiveconf accessKeyId=$ALIBABA_CLOUD_ACCESS_KEY_ID \
  --hiveconf accessKeySecret=$ALIBABA_CLOUD_ACCESS_KEY_SECRET
Note
  • You must configure environment variables before you can run the sample code. For more information about how to configure environment variables, see Configure environment variables.

  • You can obtain the type of the data source that is used to access Tablestore from /opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/*. If your E-MapReduce (EMR) cluster uses Spark 2, you must change spark3 in the preceding statement to spark2.

  • If the error java.lang.ClassNotFoundException: org.apache.commons.net.util.Base64 is reported when you use Spark 3 to access Tablestore, you need to add the commons-net dependency. You can change the value of --jars to /opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/*,/opt/apps/HADOOP-COMMON/hadoop-common-current/share/hadoop/common/lib/commons-net-3.6.jar.

  • If you do not want to add the --jars parameter each time you execute a statement, you can add /opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/*:/opt/apps/HADOOP-COMMON/hadoop-common-current/share/hadoop/common/lib/commons-net-3.6.jar to the values of the spark.driver.extraClassPath and spark.executor.extraClassPath configuration items on the Configure tab of the Spark service page.

The following example shows how to create a table and read data from the table.

create table test_tableStore
using tablestore
  options(endpoint = 'https://test.cn-hangzhou.vpc.tablestore.aliyuncs.com',
  access.key.id = '${hiveconf:accessKeyId}',
  access.key.secret = '${hiveconf:accessKeySecret}',
  table.name = 'test_table',
  instance.name = 'test_instance',
  catalog = '{"columns":{"pk":{"col":"pk","type":"string"},"data":{"col":"data","type":"string"}}}'
);

select * from test_tableStore

References