All Products
Search
Document Center

Stream computing

Last Updated: Nov 29, 2021

You can use Spark SQL in an E-MapReduce (EMR) cluster to access Tablestore. Then, EMR uses Change Data Capture (CDC) to complete micro-batch stream consumption and computing for Spark based on Tunnel Service. At-least-once semantics are also provided.

Prerequisites

  • An EMR Hadoop cluster is created. For more information, see Create a cluster.

    When you create a cluster, make sure that Assign Public IP Address is enabled to access the cluster over the Internet and Remote Logon is enabled to log on to a remote server by using Shell.

    Note

    This topic uses Shell commands. For more information about how to use the graphical interfaces of EMR to implement data development, see Manage projects.

  • The emr-datasources_shaded_2.11-2.2.0-SNAPSHOT.jar package is uploaded to the EMR Header server.

Quick start

  1. Create tables and tunnels in Tablestore.

    1. Create a Source table and a Sink table in the Tablestore console. For more information, see Quick start of Tablestore.

      The name of the Source table is OrderSource. The primary key columns are UserId and OrderId. The attribute columns are price and timestamp. The following figure shows an example of the Source table.

    2. The name of the Sink table is OrderStreamSink. The primary key columns are begin and end. The attribute columns are count and totalPrice. The start time and end time are displayed in the format of yyyy-MM-dd HH:mm:ss. Example: 2019-11-27 14:54:00.

    3. Create a tunnel for the Source table. For more information, see Quick start.

      The name, ID, and type of the tunnel are displayed in the Tunnels section. The tunnel ID is used for stream processing.

      fig_00003
  2. Create a Spark external table on the EMR cluster side.

    1. Log on to the EMR Header server.

    2. Run the following command to start the command line of Spark SQL. You can use the command line to create an external table in Spark SQL and perform SQL-related operations.

      The standard parameters to start Spark is in the --num-executors 32 --executor-memory 2g --executor-cores 2 format. You can adjust the parameter values based on the specific cluster configurations. <Version> specifies the version information of the uploaded JAR package. Example: 2.1.0-SNAPSHOT.

      spark-sql --jars emr-datasources_shaded_2.11-<Version>.jar --master yarn --num-executors 32 --executor-memory 2g --executor-cores 2
    3. Create an external Source table, which is named order_source. This table corresponds to the OrderSource table in Tablestore.

      • Parameters

        Parameter

        Description

        endpoint

        The endpoint of the Tablestore instance. The EMR cluster uses a virtual private cloud (VPC) address to access the Tablestore instance.

        access.key.id

        The AccessKey ID of your Alibaba Cloud account.

        access.key.secret

        The AccessKey secret of your Alibaba Cloud account.

        instance.name

        The name of the instance.

        table.name

        The name of the Tablestore data table.

        catalog

        The schema of the Tablestore data table.

      • Example

        DROP TABLE IF EXISTS order_source;
        CREATE TABLE order_source
        USING tablestore
        OPTIONS(
        endpoint="http://vehicle-test.cn-hangzhou.vpc.tablestore.aliyuncs.com",
        access.key.id="",
        access.key.secret="",
        instance.name="vehicle-test",
        table.name="OrderSource",
        catalog='{"columns": {"UserId": {"type": "string"}, "OrderId": {"type": "string"},"price": {"type": "double"}, "timestamp": {"type": "long"}}}'
        );    
  3. Perform real-time stream computing.

    Real-time stream computing collects statistics for the number of orders and the order amount within a time window and writes the aggregate results to the Tablestore data table.

    1. Create an external Sink table, which is named order_stream_sink. This table corresponds to the OrderStreamSink table in Tablestore.

      The parameter configurations for creating an external Sink table and an external Source table differ only in the catalog field.

    2. Create a view on the order_source table.

      The ID of the tunnel that you created for the Source table in Tablestore is required when you create the view.

    3. Run a Stream SQL job to perform real-time aggregation and write the aggregate results to the OrderStreamSink table in Tablestore in real time.

    // Create an external Sink table, which is named order_stream_sink. This table corresponds to the OrderStreamSink table in Tablestore. 
    DROP TABLE IF EXISTS order_stream_sink;
    CREATE TABLE order_stream_sink
    USING tablestore
    OPTIONS(
    endpoint="http://vehicle-test.cn-hangzhou.vpc.tablestore.aliyuncs.com",
    access.key.id="",
    access.key.secret="",
    instance.name="vehicle-test",
    table.name="OrderStreamSink",
    catalog='{"columns": {"begin": {"type": "string"},"end": {"type": "string"}, "count": {"type": "long"}, "totalPrice": {"type": "double"}}}'
    );
    
    // Create a view named order_source_stream_view on the order_source table. 
    CREATE SCAN order_source_stream_view ON order_source USING STREAM
    OPTIONS(
    tunnel.id="4987845a-1321-4d36-9f4e-73d6db63bf0f", 
    maxoffsetsperchannel="10000");
    
    // Run a Stream SQL job on the order_source_stream_view. The following code provides an example on how to aggregate the number of orders and the order amount at an interval of 30 seconds. 
    // Write the aggregate results to the OrderStreamSink table in Tablestore in real time. 
    CREATE STREAM job1
    options(
    checkpointLocation='/tmp/spark/cp/job1',
    outputMode='update'
    )
    INSERT INTO order_stream_sink
    SELECT CAST(window.start AS String) AS begin, CAST(window.end AS String) AS end, count(*) AS count, CAST(sum(price) AS Double) AS totalPrice FROM order_source_stream_view GROUP BY window(to_timestamp(timestamp / 1000), "30 seconds");

    You can obtain the aggregate results after you run Stream SQL. The aggregate results are saved in the OrderStreamSink table. The following figure shows an example of the aggregate results.