All Products
Search
Document Center

Tablestore:Batch computing

Last Updated:Jan 19, 2024

You can use Apache Spark 2 in an E-MapReduce (EMR) cluster to access Tablestore. For batch computing, Tablestore Spark connector allows you to select indexes, prune partitions, push down the Projection column and filter conditions, and dynamically specify the sizes of partitions. In addition, Tablestore Spark connector uses the global secondary index or search index of Tablestore to accelerate queries.

Prerequisites

  • An EMR Hadoop cluster is created. For more information, see Quick start for EMR.

    When you create the EMR cluster, configure the cluster parameters based on the following configurations and your business requirements:

    • Business Scenario: Select Custom Cluster from the drop-down list.

    • Optional Services: Select Spark2, Hive, YARN, Hadoop-Common, and HDFS.

    • Metadata: Select Built-in MySQL.

    In addition, make sure that the Assign Public Network IP parameter is set to On for the master node group. Use the default values for other parameters.

    Important

    If you do not set the Assign Public Network IP parameter to On, you cannot access the cluster over the Internet. To access the cluster that you created over the Internet, go to the Elastic Compute Service (ECS) console to apply for an elastic IP address (EIP).

  • Roles are assigned to EMR by using your Alibaba Cloud account. For more information, see Assign roles.

  • A Resource Access Management (RAM) user is created, and the AliyunOTSFullAccess permission is granted to the RAM user to manage Tablestore. For more information, see Configure user permissions.

    Important

    To protect the AccessKey pair of your Alibaba Cloud account, we recommend that you create a RAM user, grant the required permissions to the RAM user, and then use the AccessKey pair of the RAM user to access Tablestore.

  • An AccessKey pair that consists of an AccessKey ID and an AccessKey secret is obtained. For more information, see Obtain an AccessKey pair.

Connect to a global secondary index

After Spark is connected to a Tablestore table and a global secondary index, the system selects an index table based on the column conditions for queries.

Step 1: Create a table or a global secondary index in Tablestore

  1. Create a Tablestore table. For more information, see Use Tablestore.

    In this example, the name of the table is tpch_lineitem_perf. The primary key columns are the l_orderkey column of the LONG type and the l_linenumber column of the LONG type. The 14 attribute columns include the l_comment column of the STRING type, the l_commitdate column of the STRING type, the l_discount column of the DOUBLE type, the l_extendedprice column of the DOUBLE type, the l_linestatus column of the STRING type, the l_partkey column of the LONG type, the l_quantity column of the DOUBLE type, and the l_receiptdate column of the STRING type. The number of data entries is 384,016,850. The following figure shows an example of the data entries.

    fig_00a

  2. Optional. Create a global secondary index on the table. For more information, see Use secondary indexes in the Tablestore console.

    Note

    To use non-primary key columns in the query conditions, we recommend that you create a global secondary index to accelerate queries.

    A global secondary index allows you to create an index table based on specified columns. Data in the generated index table is sorted by the specified indexed columns. All data written to the base table is automatically synchronized to the index table in an asynchronous manner.

    fig_00b

Step 2: Create a Spark external table in the EMR cluster

  1. Connect to the master node.

  2. Run the following command to start the Spark SQL CLI. You can use the CLI to create a Spark external table and execute SQL statements.

    spark-sql --jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark2-emrsdk/*

    The following information is displayed.

    image.png

  3. Connect to the global secondary index when you create the Spark external table.

    • Parameters

      Parameter

      Description

      endpoint

      The endpoint of the Tablestore instance. We recommend that you use a virtual private cloud (VPC) endpoint. Make sure the network connectivity between the EMR cluster and the Tablestore instance when you use the VPC endpoint. For more information, see Endpoints.

      access.key.id

      The AccessKey ID and AccessKey secret of your Alibaba Cloud account or a RAM user. For more information, see Create an AccessKey pair.

      access.key.secret

      instance.name

      The name of the Tablestore instance.

      table.name

      The name of the Tablestore table.

      split.size.mbs

      The size of each split. Default value: 100. Unit: MB.

      max.split.count

      The maximum number of splits calculated for the table. The number of concurrent tasks corresponds to the number of splits. Default value: 1000.

      catalog

      The schema of the table.

    • Example

      DROP TABLE IF EXISTS tpch_lineitem;
      CREATE TABLE tpch_lineitem
      USING tablestore
      OPTIONS(endpoint="http://vehicle-test.cn-hangzhou.vpc.tablestore.aliyuncs.com",
              access.key.id="",
              access.key.secret="",
              instance.name="vehicle-test",
              table.name="tpch_lineitem_perf",
              split.size.mbs=10,
              max.split.count=1000,
              catalog='{"columns":{"l_orderkey":{"type":"long"},"l_partkey":{"type":"long"},"l_suppkey":{"type":"long"},"l_linenumber":{"type":"long"},"l_quantity":{"type":"double"},"l_extendedprice":{"type":"double"},"l_discount":{"type":"double"},"l_tax":{"type":"double"},"l_returnflag":{"type":"string"},"l_linestatus":{"type":"string"},"l_shipdate":{"type":"string"},"l_commitdate":{"type":"string"},"l_receiptdate":{"type":"string"},"l_shipinstruct":{"type":"string"},"l_shipmode":{"type":"string"},"l_comment":{"type":"string"}}}'
      );

Step 3: Perform SQL queries

The following section provides examples on SQL queries to meet different business requirements:

  • Full table query

    • SQL statement: SELECT COUNT(*) FROM tpch_lineitem;

    • Total duration for executing the SQL statement: 36.199s, 34.711s, and 34.801s. Average duration: 35.237s.

  • Primary key-based query

    • SQL statement: SELECT COUNT(*) FROM tpch_lineitem WHERE l_orderkey = 1 AND l_linenumber = 1;

    • Tablestore server: average duration of 0.585 ms for the GetRow operation.

  • Non-primary key column-based query without using the global secondary index

    • SQL statement: SELECT count(*) FROM tpch_lineitem WHERE l_shipdate = '1996-06-06';

    • Total duration for executing the SQL statement: 37.006s, 37.269s, and 37.17s. Average duration: 37.149s.

  • Non-primary key column-based query by using the global secondary index

    • SQL statement: SELECT count(*) FROM tpch_lineitem WHERE l_shipdate = '1996-06-06';

    • Total duration for executing the SQL statement when the global secondary index is enabled for the l_shipdate column: 1.686s, 1.651s, and 1.784s. Average duration: 1.707s.

Connect to a search index

After Spark is connected to a Tablestore table and a search index, the system selects an index table based on the column conditions for queries.

Step 1: Create a table and a search index in Tablestore

  1. Creates a table. For more information, see Use Tablestore.

    In this example, the name of the table is geo_table. The primary key column is the pk1 column of the STRING type. The attribute columns are the val_keyword1 column of the STRING type, the val_keyword2 column of the STRING type, the val_keyword3 column of the STRING type, the val_bool column of the BOOLEAN type, the val_double column of the DOUBLE type, the val_long1 column of the LONG type, the val_long2 column of the LONG type, the val_text column of the STRING type, and the val_geo column of the STRING type. The number of data entries is 208,912,382. The following figure shows an example of the data entries.

    fig_002

  2. Create a search index on the table. For more information, see the Step 1: Create a search index section of the "Use the Tablestore console" topic.

    When you create a search index, configure mappings for the search index based on field types.

    Note

    When you create a search index, select Geographical Location instead of STRING as the data type of the GEOPOINT field in the search index.

    fig_001

    After you create a search index, the search index synchronizes data from the table. When the search index enters the incremental state, the search index is created.

    fig_searchindex_batch_list

Step 2: Create a Spark external table in the EMR cluster

  1. Connect to the master node.

  2. Connect to the search index when you create the Spark external table.

    • Parameters

      Parameter

      Description

      endpoint

      The endpoint of the Tablestore instance. We recommend that you use a VPC endpoint. Make sure the network connectivity between the EMR cluster and the Tablestore instance when you use the VPC endpoint. For more information, see Endpoints.

      access.key.id

      The AccessKey ID and AccessKey secret of your Alibaba Cloud account or a RAM user. For more information, see Create an AccessKey pair.

      access.key.secret

      instance.name

      The name of the Tablestore instance.

      table.name

      The name of the Tablestore table.

      search.index.name

      The name of the search index.

      max.split.count

      The maximum number of concurrent tasks during the parallel scan of a search index. The maximum number of concurrent tasks during parallel scan corresponds to that of splits in Spark.

      push.down.range.long

      Specifies whether to push down predicates where operators such as >=, >, <, and <= are used to compare values with LONG column values. The value of this parameter is of the BOOLEAN type. Default value: true. For more information, see Configure predicate pushdown for batch computing. Valid values:

      • true: pushes down predicates where operators such as >=, >, <, and <= are used to compare values with LONG column values.

      • false: does not push down predicates where operators such as >=, >, <, and <= are used to compare values with LONG column values.

      push.down.range.string

      Specifies whether to push down predicates where operators such as >=, >, <, and <= are used to compare values with STRING column values. The value of this parameter is of the BOOLEAN type. Default value: true. For more information, see Configure predicate pushdown for batch computing. Valid values:

      • true: pushes down predicates where operators such as >=, >, <, and <= are used to compare values with STRING column values.

      • false: does not push down predicates where operators such as >=, >, <, and <= are used to compare values with STRING column values.

    • Example

      DROP TABLE IF EXISTS geo_table;
      CREATE TABLE geo_table (
          pk1 STRING, val_keyword1 STRING, val_keyword2 STRING, val_keyword3 STRING, 
          val_bool BOOLEAN, val_double DOUBLE, val_long1 LONG, val_long2 LONG,
          val_text STRING, val_geo STRING COMMENT "geo stored in string format"
      )
      USING tablestore
      OPTIONS(endpoint="https://sparksearchtest.cn-hangzhou.vpc.tablestore.aliyuncs.com",
              access.key.id="",
              access.key.secret="",
              instance.name="sparksearchtest",
              table.name="geo_table",
              search.index.name="geo_table_index",
              max.split.count=64,
              push.down.range.long = false,
              push.down.range.string = false
      );

Step 3: Perform SQL queries

The following section provides examples on SQL queries to meet different business requirements:

  • Full table query by using the search index

    • SQL statement: SELECT COUNT(*) FROM geo_table;

    • Actual duration for executing the SQL statement: 165.208s. Average QPS: 1,264,500. Data entries for testing: 208,912,382. Number of concurrent tasks during parallel scan: 64.

      208912382
      Time taken: 165.208 seconds, Fetched 1 row(s)
      20/06/29 20:55:11 INFO [main] SparkSQLCLIDriver: Time taken: 165.208 seconds, Fetched 1 row(s)
  • Boolean query

    • SQL statement: SELECT val_long1, val_long2, val_keyword1, val_double FROM geo_table WHERE (val_long1 > 17183057 AND val_long1 < 27183057) AND (val_long2 > 1000 AND val_long2 < 5000) LIMIT 100;

    • Actual duration for executing the SQL statement: 2.728s. Spark pushes the Projection column and filter conditions to the search index, which improves query efficiency.

      21423964        4017    aaa     2501.9901650365096
      21962236        2322    eio     2775.9021545044116
      Time taken: 2.894 seconds, Fetched 100 row(s)
      20/06/30 18:51:24 INFO [main] SparkSQLCLIDriver: Time taken: 2.894 second
  • Geo query

    Geo query supports the following query types: geo-distance query, geo-bounding box query, and geo-polygon query. In the example, val_geo indicates the name of the GEOPOINT field. The coordinate pair of a geographical location is in the format of "latitude,longitude".

    • Geo-distance query

      Syntax: val_geo = '{"centerPoint":"The coordinate pair of the central point.", "distanceInMeter": The distance from the central point.}'

      SQL statement: SELECT COUNT(*) FROM geo_table WHERE val_geo = '{"centerPoint":"6.530045901643962,9.05358919674954", "distanceInMeter": 3000.0}';

    • Geo-bounding box query

      Syntax: val_geo = '{"topLeft":"The coordinate of the upper-left corner in the rectangular area.", "bottomRight":"The coordinate of the lower-right corner in the rectangular area."}'

      SQL statement: SELECT COUNT(*) FROM geo_table WHERE val_geo = '{"topLeft":"6.257664116603074,9.1595116589601", "bottomRight":"6.153593333442616,9.25968497923747"}';

    • Geo-polygon query

      Syntax: val_geo = '{"points":["Coordinate Pair 1", "Coordinate Pair 2", .... "Coordinate Pair n-1", "Coordinate Pair n"]}'

      SQL statement: SELECT COUNT(*) FROM geo_table WHERE val_geo = '{"points":["6.530045901643962,9.05358919674954", "6.257664116603074,9.1595116589601", "6.160393397574926,9.256517839929597", "6.16043846779313,9.257192872563525"]}';