This topic describes how to create cross-database Spark SQL nodes by using the task orchestration feature to synchronize order tables and commodity tables from online databases to Object Storage Service (OSS) buckets on a regular basis. This way, merchants of e-commerce platforms can view the number and amount of commodities by type on the previous day in OSS buckets.

Prerequisites

  • A MySQL database is created as the online database that stores order tables and commodity tables. You are granted the permissions to query data in the MySQL database. For more information about how to apply for permissions, see Overview.
  • An OSS bucket is created and registered in Data Management (DMS) to synchronize data from the tables. For more information about how to register an OSS bucket in DMS, see Register an OSS bucket.

Background information

E-commerce platforms generate large amounts of data. If the data is analyzed in online databases, the responses of the databases are slowed down. Even worse, the databases may not respond to business requests. To prevent these consequences, business data is usually synchronized to offline databases or storage systems before it is analyzed. If you do not need to synchronize analysis results back to online databases, we recommend that you synchronize your business data to OSS buckets. This way, you can directly analyze the data and view analysis results in the OSS buckets.
Note

Procedure

  1. Preparations.
  2. Create a cross-database Spark SQL node.
  3. Configure the cross-database Spark SQL node.
  4. Publish the cross-database Spark SQL node.

Preparations

Create an order table and a commodity table in the online MySQL database.

  1. Log on to the DMS console V5.0.
  2. In the top navigation bar, choose SQL Console > SQL Console.
  3. In the Please select the database first dialog box, enter a keyword to search for the MySQL database, select the database from the search results, and then click Confirm.
  4. Create an order table named t_order and a commodity table named t_product in the MySQL database.
    1. Create an order table named t_order. Copy and paste the following SQL statement to the SQL editor and click Execute.
      CREATE TABLE `t_order` (
        `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'Primary key',
        `product_id` bigint(20) NOT NULL COMMENT 'Commodity ID',
        `gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Creation time',
        `gmt_modified` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Modification time',
        `customer_id` bigint(20) NOT NULL COMMENT 'Customer ID',
        `price` decimal(14,2) NOT NULL COMMENT 'Price',
        `status` varchar(64) NOT NULL COMMENT 'Order status',
        `province` varchar(256) DEFAULT NULL COMMENT 'Province',
        PRIMARY KEY (`id`),
        KEY `idx_product_id` (`product_id`),
        KEY `idx_customer_id` (`customer_id`),
        KEY `idx_status` (`status`)
      ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 ROW_FORMAT=COMPACT COMMENT='Order table'
      ;
    2. Create a commodity table named t_product. Copy and paste the following SQL statement to the SQL editor and click Execute.
      CREATE TABLE `t_product` (
        `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'Primary key',
        `name` varchar(128) NOT NULL COMMENT 'Commodity name',
        `type` varchar(64) NOT NULL COMMENT 'Commodity type',
        `gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Creation time',
        `gmt_modified` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Modification time',
        PRIMARY KEY (`id`)
      ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='Commodity table'
      ;
  5. Write test data to the t_order table and the t_product table. Use the test data generation feature of DMS to generate test data. For more information, see Generate test data.
    • Generate 20 million rows of test data in the t_order table.
      Note For a database instance managed in Flexible Management mode, you can use the test data generation feature to generate up to a million rows of test data in a ticket.
    • Generate 10 thousand rows of test data in the t_product table.

Create a cross-database Spark SQL node

  1. Log on to the DMS console V5.0.
  2. In the top navigation bar, click DTS. In the left-side navigation pane, choose Data Development > Task Orchestration.
  3. Create a task flow.
    1. Click Create Task Flow.
    2. In the Create Task Flow dialog box, specify the Task Flow Name and Description parameters and click OK.
  4. In the Task Type list on the left side of the canvas, drag the Cross-Database Spark SQL node to the blank area on the canvas.

Configure the cross-database Spark SQL node

  1. On the details page of the task flow, double-click the cross-database Spark SQL node that you created on the canvas.
  2. On the configuration page, configure the ${today} variable that specifies the current date. For more information about variables, see Variables.
    1. Click the Variable Setting tab in the right-side navigation pane.
    2. In the panel that appears, click the Node Variable tab.
    3. Set the parameters of the variable as required.
      Node Variable tab
  3. Add the OSS bucket to store and analyze data.
    1. In the OSS Reference section, click Add OSS Reference.
    2. Select the required OSS bucket.
      Note If you have not logged on to the OSS bucket, enter your AccessKey ID and AccessKey secret in the Login Instance dialog box that appears.
    3. Specify the path in which data is stored in the OSS bucket.
      Note
      • If the path that you specify does not exist, the system automatically creates the path.
      • You can use a variable to specify a path. Example: /path/${foldername}.
    4. Set the alias of the OSS bucket that you want to reference in Spark SQL statements. In this example, set the alias to oss.
    5. Click Save.
  4. Add the MySQL database that stores order tables and commodity tables.
    1. In the Database Reference section, click Add Database Reference.
    2. Set the Database Type, Database, and Alias in Spark SQL parameters. In this example, set the Alias in Spark SQL parameter to demo_id. For more information about the parameters, see Step 8 in Configure a cross-database Spark SQL node.
      Note If you have not logged on to the database, enter the database account and password in the Login Instance dialog box that appears.
    3. Click Save.
  5. Enter Spark SQL statements in the SQL editor and click Save.
    /* Use the Spark SQL syntax to write SQL statements. A table is referenced as alias.table_name. */
    
    /* Create a table named t_order in the OSS bucket and add dt as a partition field. */
    CREATE TABLE oss.t_order (
      id bigint COMMENT 'Primary key',
      product_id bigint  COMMENT 'Commodity ID',
      gmt_create timestamp  COMMENT 'Creation time',
      gmt_modified timestamp  COMMENT 'Modification time',
      customer_id bigint COMMENT 'Customer ID',
      price decimal(38,8)  COMMENT 'Price',
      status string COMMENT 'Order status',
      province string COMMENT 'Province',
      dt string comment 'Data timestamp partition'
    )  partitioned by (dt) COMMENT 'Order table';
    
    insert overwrite oss.t_order partition(dt='${bizdate}')
    select id, product_id, gmt_create, gmt_modified, customer_id, price, status, province 
    from demo_id.t_order o 
    where o.gmt_create>= '${bizdate}' and o.gmt_create< '${today}';
    
    /* Create a table named t_product in the OSS bucket. */
    CREATE TABLE oss.t_product (
      id bigint COMMENT 'Primary key',
      name string COMMENT 'Commodity name',
      type string COMMENT 'Commodity type',
      gmt_create timestamp  COMMENT 'Creation time',
      gmt_modified timestamp  COMMENT 'Modification time'
    )   COMMENT 'Commodity table';
    /* Synchronize full data from the demo_id.t_product table to the oss.t_product table. */
    insert overwrite oss.t_product 
    select id, name, type, gmt_create, gmt_modified 
    from demo_id.t_product; 
    
    /* Create a table named t_order_report_daily in the OSS bucket and add dt as a partition field. */
    CREATE TABLE oss.t_order_report_daily(
       dt string  comment 'Data timestamp partition',
       product_type string  comment 'Commodity type',
       order_cnt bigint  comment 'Order quantity',
       order_amt decimal(38, 8)  comment 'Order amount'
    )  partitioned by (dt) comment 'Orders per day';
    
    /* Insert data by partition. */
    insert overwrite oss.t_order_report_daily partition(dt='${bizdate}')
    select
           p.type as product_type,
           count(*)  order_cnt,
           sum(price)  order_amt
      from oss.t_product p join oss.t_order o on o.product_id= p.id
     where o.gmt_create>= '${bizdate}'
       and o.gmt_create< '${today}'
     group by product_type;
                            

    You can store objects in OSS buckets in the CSV, Parquet, ORC, or JSON format. By default, the objects are stored in the CSV format. You can use the USING clause to specify a format in the CREATE TABLE statement.

    The following example shows how to specify the Parquet format:

    CREATE TABLE oss.t_order (
      id bigint COMMENT 'Primary key',
      product_id bigint  COMMENT 'Commodity ID',
      gmt_create timestamp  COMMENT 'Creation time',
      gmt_modified timestamp  COMMENT 'Modification time',
      customer_id bigint COMMENT 'Customer ID',
      price decimal(38,8)  COMMENT 'Price',
      status string COMMENT 'Order status',
      province string COMMENT 'Province',
      dt string comment 'Data timestamp partition'
    )  USING PARQUET partitioned by (dt) COMMENT 'Order table';

Publish the cross-database Spark SQL node

  1. On the details page of the task flow, click Try Run in the upper-left corner of the canvas.
    Click the Execution Logs tab to view the execution results.
    • If status SUCCEEDED appears in the last line of the logs, the task flow is successfully run.
    • If status FAILED appears in the last line of the logs, the task flow fails to be run.
      Note If the task flow fails to be run, view the node on which the failure occurs and the reason for the failure in the logs. Then, modify the configuration of the node and try again.
    After the task flow is successfully run, you can return to the homepage of the DMS console to view details. In the left-side navigation pane, click the database instance that you want to query, right-click the name of the OSS bucket, and then click Query to query the data stored in the OSS bucket.
  2. Enable scheduling for the task flow.
    1. Click the blank area on the canvas.
    2. Click the Task Flow Information tab.
    3. In the Scheduling Settings section, turn on Enable Scheduling. For more information, see Configure a task flow.
  3. Publish the task flow. After the task flow is published, it is automatically run based on the specified scheduling cycle.
    1. Click Publish in the upper-left corner of the canvas.
    2. In the Publish dialog box, set the Remarks parameter and click OK.