This topic describes how to use the ods_user_info_d_spark and ods_raw_log_d_spark external tables created based on Spark SQL to access the basic user information and user website access logs that are synchronized to a private Object Storage Service (OSS) bucket. This topic also describes how to use E-MapReduce (EMR) Spark SQL nodes to process the synchronized data to obtain the desired user profile data. This topic helps you understand how to compute and analyze synchronized data by using Spark SQL to complete simple data processing for data warehouses.
Prerequisites
Before you start this tutorial, complete the steps in Synchronize data.
Step 1: Establish a data processing link
In the data synchronization phase, the required data is loaded by using Spark. The next objective is to further process the data to generate the basic user profile data.
Log on to the DataWorks console and go to the DATA STUDIO pane of the Data Studio page. In the Workspace Directories section of the DATA STUDIO pane, find the prepared
User_profile_analysis_Sparkworkflow and click the workflow name to go to the configuration tab of the workflow.Drag EMR SPARK SQL from the EMR section of the configuration tab to the canvas on the right. In the Create Node dialog box, configure the Node Name parameter.
The following table lists node names that are used in this tutorial and the functionalities of the nodes.
Node type
Node name
Node functionality
EMR Spark SQLdwd_log_info_di_sparkThis node can be used to process data in the
ods_raw_log_d_sparktable based on Spark SQL and synchronize data to thedwd_log_info_di_sparktable.
EMR Spark SQLdws_user_info_all_di_sparkThis node can be used to join the fact log table
dwd_log_info_di_sparkand the basic user information tableods_user_info_d_sparkbased on the uid field to generate an aggregate user log table.This node can also be used to aggregate the basic user information table
ods_user_info_d_sparkand the fact log tabledwd_log_info_di_sparkand synchronize data from the tables to thedws_user_info_all_di_sparktable.
EMR Spark SQLads_user_info_1d_sparkThis node can be used to further process data in the
dws_user_info_all_di_sparktable and synchronize data from the table to theads_user_info_1d_sparktable to generate a basic user profile.Draw lines to configure ancestor nodes for the EMR Spark SQL nodes, as shown in the following figure.
NoteYou can draw lines to configure scheduling dependencies for nodes in a workflow. You can also use the automatic parsing feature to enable the system to automatically identify scheduling dependencies between nodes. In this tutorial, scheduling dependencies between nodes are configured by drawing lines. For information about the automatic parsing feature, see Use the automatic parsing feature.
Step 2: Configure data processing nodes
After the workflow is configured, use the EMR Spark SQL nodes to process data in the basic user information table and fact log table to generate the initial user profile table ads_user_info_1d_spark.
Configure the dwd_log_info_di_spark node
In the sample code for this node, a function provided by Spark is used to process SQL code for fields in the ancestor table ods_raw_log_d_spark and then synchronize the fields to the dwd_log_info_di_spark table.
In the canvas of the workflow, move the pointer over the
dwd_log_info_di_sparknode and click Open Node.Copy the following SQL statements and paste them in the code editor:
-- Scenario: The SQL statements in the following sample code are Spark SQL statements. You can use a Spark SQL function to split data in the ods_raw_log_d_spark table that is loaded to Spark by using ##@@ to generate multiple fields, and then synchronize the fields to the dwd_log_info_di_spark table. -- Note: -- DataWorks provides scheduling parameters that you can use to write daily incremental data to the corresponding partition of the destination table in the scheduling scenario. -- In actual development scenarios, you can define variables in the node code in the ${Variable name} format. Then, you can assign scheduling parameters as values to the variables on the Properties tab of the configuration tab of the node. This way, the values of scheduling parameters are dynamically replaced in the node code based on the configurations of the scheduling parameters in the scheduling scenario. CREATE TABLE IF NOT EXISTS dwd_log_info_di_spark ( ip STRING COMMENT 'The IP address', uid STRING COMMENT 'The user ID', tm STRING COMMENT 'The time in the yyyymmddhh:mi:ss format', status STRING COMMENT 'The status code that is returned by the server', bytes STRING COMMENT 'The number of bytes that are returned to the client', method STRING COMMENT'The request method', url STRING COMMENT 'url', protocol STRING COMMENT 'The protocol', referer STRING, device STRING, identity STRING ) PARTITIONED BY (dt STRING) LOCATION 'oss://dw-spark-demo.oss-cn-shanghai-internal.aliyuncs.com/dwd_log_info_di_spark/log_${bizdate}/'; ALTER TABLE dwd_log_info_di_spark ADD IF NOT EXISTS PARTITION (dt = '${bizdate}'); INSERT OVERWRITE TABLE dwd_log_info_di_spark PARTITION (dt = '${bizdate}') SELECT ip, uid, tm, status, bytes, regexp_extract(request, '(^[^ ]+) .*', 1) AS method, regexp_extract(request, '^[^ ]+ (.*) [^ ]+$', 1) AS url, regexp_extract(request, '.* ([^ ]+$)', 1) AS protocol, regexp_extract(referer, '^[^/]+://([^/]+){1}', 1) AS referer, CASE WHEN lower(agent) RLIKE 'android' THEN 'android' WHEN lower(agent) RLIKE 'iphone' THEN 'iphone' WHEN lower(agent) RLIKE 'ipad' THEN 'ipad' WHEN lower(agent) RLIKE 'macintosh' THEN 'macintosh' WHEN lower(agent) RLIKE 'windows phone' THEN 'windows_phone' WHEN lower(agent) RLIKE 'windows' THEN 'windows_pc' ELSE 'unknown' END AS device, CASE WHEN lower(agent) RLIKE '(bot|spider|crawler|slurp)' THEN 'crawler' WHEN lower(agent) RLIKE 'feed' OR regexp_extract(request, '^[^ ]+ (.*) [^ ]+$', 1) RLIKE 'feed' THEN 'feed' WHEN lower(agent) NOT RLIKE '(bot|spider|crawler|feed|slurp)' AND agent RLIKE '^(Mozilla|Opera)' AND regexp_extract(request, '^[^ ]+ (.*) [^ ]+$', 1) NOT RLIKE 'feed' THEN 'user' ELSE 'unknown' END AS identity FROM ( SELECT SPLIT(col, '##@@')[0] AS ip, SPLIT(col, '##@@')[1] AS uid, SPLIT(col, '##@@')[2] AS tm, SPLIT(col, '##@@')[3] AS request, SPLIT(col, '##@@')[4] AS status, SPLIT(col, '##@@')[5] AS bytes, SPLIT(col, '##@@')[6] AS referer, SPLIT(col, '##@@')[7] AS agent FROM ods_raw_log_d_spark WHERE dt = '${bizdate}' ) a;NoteIf you use the preceding SQL statements, you must replace the value of LOCATION based on your business requirements.
dw-spark-demois the domain name of the OSS bucket that you created when you prepare an environment.In the right-side navigation pane of the configuration tab of the node, click Debugging Configurations. On the Debugging Configurations tab, configure the following parameters. These parameters are used to test the workflow in Step 4.
Parameter
Description
Computing Resource
Select the Spark computing resource that is associated with the workspace when you prepare environments.
Resource Group
Select the serverless resource group that is associated with the workspace when you prepare environments.
Script Parameters
In the Parameter Value column of the bizdate parameter, enter a value in the
yyyymmddformat. Example:bizdate=20250223. When you debug the workflow, Data Studio replaces the variables defined for nodes in the workflow with the constant.Optional. Configure scheduling properties.
You can retain default values for parameters related to scheduling properties in this tutorial. If you want to modify the configurations of scheduling properties, you can click Properties in the right-side navigation pane of the configuration tab of the batch synchronization node. For information about other parameters on the Properties tab, see Node scheduling.
Scheduling Parameters: In this tutorial, scheduling parameters are configured for the workflow. You do not need to configure scheduling parameters for the inner nodes of the workflow. The configured scheduling parameters can be directly used for code and tasks developed based on the inner nodes.
Scheduling Policies: You can configure the Time for Delayed Execution parameter to specify the duration by which the running of the batch synchronization node lags behind the running of the workflow. In this tutorial, you do not need to configure this parameter.
Click Save.
Configure the dws_user_info_all_di_spark node
This node is used to aggregate the basic user information table ods_user_info_d_spark and the fact log table dwd_log_info_di_spark and synchronize the aggregation result to the dws_user_info_all_di_spark table.
In the canvas of the workflow, move the pointer over the
dws_user_info_all_di_sparknode and click Open Node.Copy the following SQL statements and paste them in the code editor:
-- Scenario: The SQL statements in the following sample code are Spark SQL statements. You can join the dwd_log_info_di_spark and ods_user_info_d_spark tables based on the uid field and write data to the specified dt partition. -- Note: -- DataWorks provides scheduling parameters that you can use to write daily incremental data to the corresponding partition of the destination table in the scheduling scenario. -- In actual development scenarios, you can define variables in the node code in the ${Variable name} format. Then, you can assign scheduling parameters as values to the variables on the Properties tab of the configuration tab of the node. This way, the values of scheduling parameters are dynamically replaced in the node code based on the configurations of the scheduling parameters in the scheduling scenario. CREATE TABLE IF NOT EXISTS dws_user_info_all_di_spark ( uid STRING COMMENT 'The user ID', gender STRING COMMENT 'The gender', age_range STRING COMMENT 'The age range', zodiac STRING COMMENT 'The zodiac sign', device STRING COMMENT 'The terminal type', method STRING COMMENT 'The HTTP request type', url STRING COMMENT 'url', `time` STRING COMMENT 'The time in the yyyymmddhh:mi:ss format' ) PARTITIONED BY (dt STRING) LOCATION 'oss://dw-spark-demo.oss-cn-shanghai-internal.aliyuncs.com/dws_user_info_all_di_spark/log_${bizdate}/'; -- Add a partition. ALTER TABLE dws_user_info_all_di_spark ADD IF NOT EXISTS PARTITION (dt = '${bizdate}'); -- Insert data from the basic user information table and the fact log table. INSERT OVERWRITE TABLE dws_user_info_all_di_spark PARTITION (dt = '${bizdate}') SELECT COALESCE(a.uid, b.uid) AS uid, b.gender AS gender, b.age_range AS age_range, b.zodiac AS zodiac, a.device AS device, a.method AS method, a.url AS url, a.tm FROM ( SELECT * FROM dwd_log_info_di_spark WHERE dt='${bizdate}' ) a LEFT OUTER JOIN ( SELECT * FROM ods_user_info_d_spark WHERE dt='${bizdate}' ) b ON a.uid = b.uid;NoteIf you use the preceding SQL statements, you must replace the value of LOCATION based on your business requirements.
dw-spark-demois the domain name of the OSS bucket that you created when you prepare an environment.In the right-side navigation pane of the configuration tab of the node, click Debugging Configurations. On the Debugging Configurations tab, configure the following parameters. These parameters are used to test the workflow in Step 4.
Parameter
Description
Computing Resource
Select the Spark computing resource that is associated with the workspace when you prepare environments.
Resource Group
Select the serverless resource group that is associated with the workspace when you prepare environments.
Script Parameters
In the Parameter Value column of the bizdate parameter, enter a value in the
yyyymmddformat. Example:bizdate=20250223. When you debug the workflow, Data Studio replaces the variables defined for nodes in the workflow with the constant.Optional. Configure scheduling properties.
You can retain default values for parameters related to scheduling properties in this tutorial. If you want to modify the configurations of scheduling properties, you can click Properties in the right-side navigation pane of the configuration tab of the batch synchronization node. For information about other parameters on the Properties tab, see Node scheduling.
Scheduling Parameters: In this tutorial, scheduling parameters are configured for the workflow. You do not need to configure scheduling parameters for the inner nodes of the workflow. The configured scheduling parameters can be directly used for code and tasks developed based on the inner nodes.
Scheduling Policies: You can configure the Time for Delayed Execution parameter to specify the duration by which the running of the batch synchronization node lags behind the running of the workflow. In this tutorial, you do not need to configure this parameter.
Click Save.
Configure the ads_user_info_1d_spark node
This node is used to further process data in the dws_user_info_all_di_spark table and synchronize data from the table to the ads_user_info_1d_spark table to generate a basic user profile.
In the canvas of the workflow, move the pointer over the
ads_user_info_1d_sparknode and click Open Node.Copy the following SQL statements and paste them in the code editor:
-- Scenario: The SQL statements in the following sample code are Spark SQL statements. You can use a Spark SQL function to further process the dws_user_info_all_di_spark table in Spark SQL and synchronize data to the ads_user_info_1d_spark table. -- Note: -- DataWorks provides scheduling parameters that you can use to write daily incremental data to the corresponding partition of the destination table in the scheduling scenario. -- In actual development scenarios, you can define variables in the node code in the ${Variable name} format. Then, you can assign scheduling parameters as values to the variables on the Properties tab of the configuration tab of the node. This way, the values of scheduling parameters are dynamically replaced in the node code based on the configurations of the scheduling parameters in the scheduling scenario. CREATE TABLE IF NOT EXISTS ads_user_info_1d_spark ( uid STRING COMMENT 'The user ID', device STRING COMMENT 'The terminal type', pv BIGINT COMMENT 'pv', gender STRING COMMENT 'The gender', age_range STRING COMMENT 'The age range', zodiac STRING COMMENT 'The zodiac sign' ) PARTITIONED BY ( dt STRING ) LOCATION 'oss://dw-spark-demo.oss-cn-shanghai-internal.aliyuncs.com/ads_user_info_1d_spark/log_${bizdate}/'; ALTER TABLE ads_user_info_1d_spark ADD IF NOT EXISTS PARTITION (dt='${bizdate}'); INSERT OVERWRITE TABLE ads_user_info_1d_spark PARTITION (dt='${bizdate}') SELECT uid , MAX(device) , COUNT(0) AS pv , MAX(gender) , MAX(age_range) , MAX(zodiac) FROM dws_user_info_all_di_spark WHERE dt = '${bizdate}' GROUP BY uid;NoteIf you use the preceding SQL statements, you must replace the value of LOCATION based on your business requirements.
dw-spark-demois the domain name of the OSS bucket that you created when you prepare an environment.In the right-side navigation pane of the configuration tab of the node, click Debugging Configurations. On the Debugging Configurations tab, configure the following parameters. These parameters are used to test the workflow in Step 4.
Parameter
Description
Computing Resource
Select the Spark computing resource that is associated with the workspace when you prepare environments.
Resource Group
Select the serverless resource group that is associated with the workspace when you prepare environments.
Script Parameters
In the Parameter Value column of the bizdate parameter, enter a value in the
yyyymmddformat. Example:bizdate=20250223. When you debug the workflow, Data Studio replaces the variables defined for nodes in the workflow with the constant.Optional. Configure scheduling properties.
You can retain default values for parameters related to scheduling properties in this tutorial. If you want to modify the configurations of scheduling properties, you can click Properties in the right-side navigation pane of the configuration tab of the batch synchronization node. For information about other parameters on the Properties tab, see Node scheduling.
Scheduling Parameters: In this tutorial, scheduling parameters are configured for the workflow. You do not need to configure scheduling parameters for the inner nodes of the workflow. The configured scheduling parameters can be directly used for code and tasks developed based on the inner nodes.
Scheduling Policies: You can configure the Time for Delayed Execution parameter to specify the duration by which the running of the batch synchronization node lags behind the running of the workflow. In this tutorial, you do not need to configure this parameter.
Click Save.
Step 3: Process data
Synchronize data.
In the top toolbar of the configuration tab of the workflow, click Run. In the Enter runtime parameters dialog box, specify a value that is used for scheduling parameters defined for each node in this run, and click OK. In this tutorial,
20250223is specified. You can specify a value based on your business requirements.Query the data processing result.
Go to the SQL Query page.
Log on to the DataWorks console. In the top navigation bar, select the desired region. In the left-side navigation pane, choose . On the page that appears, click Go to DataAnalysis. In the left-side navigation pane of the page that appears, click SQL Query.
Configure an SQL query file.
Click the
icon next to My Files and select Create File. In the Create File dialog box, configure the File Name parameter and click OK. In the left-side navigation tree, find the created SQL file to go to the configuration tab of the file.
In the upper-right corner of the configuration tab, click the
icon. In the popover that appears, configure the following parameters.Parameter
Description
Workspace
Select the workspace to which the
User_profile_analysis_Sparkworkflow belongs.Data Source Type
Select
EMR Spark SQLfrom the drop-down list.Data Source Name
Select the EMR Serverless Spark computing resource that is associated with the workspace when you prepare environments.
Click OK.
Write an SQL statement for the query.
After all nodes in this topic are successfully run, write and execute the following SQL statement to check whether external tables are created based on the EMR Spark SQL nodes as expected.
-- You must update the partition filter condition to the data timestamp of your current operation. For example, if a node is scheduled to run on February 23, 2025, the data timestamp of the node is 20250222, which is one day earlier than the scheduling time of the node. SELECT * FROM dwd_log_info_di_spark WHERE dt ='The data timestamp';
Step 4: Deploy the workflow
Nodes must be deployed to the production environment before they can be automatically scheduled. You can follow the steps below to deploy the workflow to production.
In this tutorial, scheduling parameters have been configured globally in the workflow-level scheduling settings. You do not need to configure scheduling parameters for individual nodes before publishing.
1. In the left-side navigation pane of Data Studio, click
to go to the Data Studio page.
2. In the Workspace Directories, find the workflow you created and click it to open the workflow configuration page.
3. In the node toolbar, click Deploy.
4. Click Start Deployment to Production Environment and follow the guided steps to complete the process.
Step 5: Run the nodes in the production environment
After a task is deployed, its instance will be generated and run on the following day. You can perform backfill operations to verify whether the published workflow can run as expected in the production environment. For more information, see Backfill data and view data backfill instances (new version).
After the nodes are deployed, click Operation Center in the top navigation bar of the Data Studio page.
You can also click the
icon in the upper-left corner of the DataWorks console and choose . In the left-side navigation pane of the Operation Center page, choose . On the Auto Triggered Nodes page, find the zero load node
workshop_start_sparkand click the node name.In the direct acyclic graph (DAG) of the node, right-click the
workshop_start_sparknode and choose .In the Backfill Data panel, select the nodes for which you want to backfill data, configure the Data Timestamp parameter, and then click Submit and Redirect.
In the upper part of the Data Backfill page, click Refresh to check whether the workshop_start_spark node and its descendant nodes are successfully run.
To prevent extra charges after you complete the operations in this tutorial, you can configure the Effective Period parameter for all nodes in the workflow or freeze the zero load node workshop_start_spark.
More operations
Display data in a visualized manner: After you complete user profile analysis, use DataAnalysis to display the processed data in charts. This helps you quickly extract key information to gain insights into the business trends behind the data.
Monitor data quality: Configure monitoring rules for tables that are generated after data processing to help identify and intercept dirty data in advance to prevent the impacts of dirty data from escalating.
Manage data: Data tables are generated in Spark after a user profile analysis task is complete. You can view the generated data tables in Data Map and view the relationships between the tables based on data lineages.
Use DataService Studio APIs to provide services: After you obtain the final processed data, use standardized APIs in DataService Studio to share data and to provide data for other business modules that use APIs to receive data.