All Products
Search
Document Center

DataWorks:End-to-end intelligent lakehouse data development based on the DataWorks OpenLake solution

Last Updated:Mar 24, 2025

In this experiment, you can experience retail e-commerce data development and analysis scenarios based on the OpenLake House platform, use DataWorks to perform multi-engine collaborative development, and orchestrate workflows and manage data catalogs in a visualized manner. In addition, you can perform Python-based programming and debugging, and use notebooks together with AI to perform interactive data exploration and analysis.

Background information

Overview of DataWorks

DataWorks is an intelligent, lakehouse-based data development and governance platform that leverages the big data development methodology of Alibaba Group based on 15 years of big data experience. DataWorks is deeply compatible with dozens of big data and AI computing services provided by Alibaba Cloud, such as MaxCompute, E-MapReduce (EMR), Hologres, Realtime Compute for Apache Flink, and Platform for AI (PAI). DataWorks supports intelligent extract, transform, load (ETL) development, data analysis, and proactive data asset governance for data warehouses, data lakes, and the OpenLake lakehouse architecture to facilitate data management throughout the Data+AI lifecycle. Since 2009, DataWorks has continuously productized and refined the Alibaba data system to serve various industries such as public service sectors, finance, retail, Internet, automobile, and manufacturing. DataWorks has earned the trust of tens of thousands of customers who choose DataWorks for digital transformation and value creation.

Overview of DataWorks Copilot

DataWorks Copilot is your intelligent assistant in DataWorks. In DataWorks, you can use the default large language model (LLM), DeepSeek-R1-671B (full-power edition), or DeepSeek-R1-Distill-Qwen-32B to complete related operations in DataWorks. With the advanced reasoning power of DeepSeek-R1, DataWorks Copilot enables you to perform complex tasks, such as SQL code generation, optimization, and testing, based on natural language interactions. This greatly enhances the efficiency of extract, transform, load (ETL) development and data analysis.

Overview of the DataWorks notebook feature

The notebook feature of DataWorks provides an intelligent, interactive data development and analysis tool that can be used to perform engine-specific SQL or Python code analysis and run or debug code in real time. This way, you can obtain visualized data processing results. In addition, you can combine notebooks and other types of nodes to form a workflow, and commit the workflow to the scheduling system to run. This helps implement complex business scenarios in a flexible manner.

Usage notes

  • The regions and DataWorks editions in which the DataWorks Copilot public preview is available are limited. For more information, see the Public preview section in the "DataWorks Copilot" topic.

  • To use Python and the notebook feature in Data Studio, you must switch to a personal development environment first.

Limits

  • OpenLake supports only DLF 2.0.

  • Data catalogs support only DLF 2.0.

Environment preparation

  1. Prepare an Alibaba Cloud account or a RAM user.

  2. Create a workspace.

    Note

    Make sure that Participate in Public Preview of Data Studio is turned on for the workspace.

  3. Associate a computing resource with the workspace for which Participate in Public Preview of Data Studio is turned on.

Experiment procedure

Step 1: Manage data catalogs

The data catalog management capability of the data lakehouse solution allows you to create and manage data catalogs of compute engine types such as Data Lake Formation (DLF), MaxCompute, and Hologres.

  1. In the left-side navigation pane of the Data Studio page, click the image icon. The DATA CATALOG pane appears. In the DATA CATALOG pane, find the metadata type that you want to manage, move the pointer over the name of the desired data catalog, click the image icon to the right of the data catalog, and then click Open. The configuration tab of the data catalog appears.

    image

    image

  2. On the configuration tab of the data catalog, click the name of a schema. On the tab that appears, click the name of a table. The details page of the table appears.

    image

  3. In the left-side navigation pane of the Data Studio page, click the image icon. The DATA CATALOG pane appears. In the DATA CATALOG pane, find the metadata type that you want to manage, move the pointer over the name of the desired data catalog, click the image icon to the right of the data catalog, and then click Create Table. The Create Table tab appears.

    image

  4. In the left area of the Create Table tab, specify the table name and field names. Alternatively, enter a DDL statement in the right area of the Create Table tab to create a table. Then, click Deploy in the top toolbar.

    image

    Sample preset code

    CREATE TABLE dwd_mbr_user_info
    (
      id                BIGINT COMMENT 'The primary key'
      ,gmt_create       DATETIME COMMENT 'The creation time'
      ,gmt_modified     DATETIME COMMENT 'The modification time'
      ,user_id          BIGINT COMMENT 'The user ID of a member'
      ,nick             STRING COMMENT ' The nickname of a member'
      ,reg_fullname     STRING COMMENT 'The real name of a person is used if individual verification is performed, and an enterprise name is used if enterprise verification is performed.'
      ,reg_mobile_phone STRING COMMENT 'The mobile phone number entered during registration'
      ,reg_email        STRING COMMENT 'The email address entered during registration (modifiable by the user)'
      ,reg_gender       STRING COMMENT 'The gender entered during registration (F represents female and M represents male. If neither one is entered, the gender is unknown, which indicates that the person wants to keep the gender confidential.)'
      ,reg_birthdate    DATETIME COMMENT 'The birthday entered during registration (modifiable by the user)'
      ,reg_address      STRING COMMENT 'The address entered during registration (modifiable by the user)'
      ,reg_nation_id    STRING COMMENT 'The country ID entered during registration (left empty)'
      ,reg_prov_id      STRING COMMENT 'The province ID entered during registration'
      ,reg_city_id      STRING COMMENT 'The city ID entered during registration'
      ,user_regip       STRING COMMENT 'The IP address entered during registration'
      ,id_card_type     BIGINT COMMENT 'The certificate type used for membership authentication. 0: unknown; 1: ID card number; 2: business license number'
      ,id_card_number   STRING COMMENT 'If individual verification is performed, it indicates the ID card number. If enterprise verification is performed, it indicates the business license number. If no verification is performed, accuracy cannot be guaranteed.'
      ,user_regdate     DATETIME COMMENT 'The registration time'
      ,user_active_type STRING COMMENT 'The user activation method. 1: email; 2: mobile phone'
      ,user_active_time DATETIME COMMENT 'The activation time'
      ,vip_level        STRING COMMENT 'The VIP rank'
      ,is_delete        STRING COMMENT 'Delete or not'
    )
    COMMENT 'The membership registration form'
    PARTITIONED BY 
    (
      ds                STRING COMMENT 'YYYYMMDD'
    )
    LIFECYCLE 7;

Step 2: Orchestrate a workflow

In DataWorks, you can perform drag operations in a visualized manner to orchestrate various types of data development nodes in a workflow from the business perspective. You do not need to separately configure common parameters such as the scheduling time. This helps you easily manage complex task projects.

  1. In the left-side navigation pane of the Data Studio page, click the image icon. The DATASTUDIO pane appears. In the DATASTUDIO pane, click Workspace Directories, click the image icon on the right, and then click Create Workflow.

    image

  2. In the popover that appears, enter a workflow name and press the Enter key.

    Preset workflow name: Retail e-commerce business overview

  3. On the configuration tab of the workflow, click Drag or Click to Add Node in the center of the canvas. In the Create Node dialog box, configure the Node Type and Node Name parameters and click Confirm.

    • Preset node name: Retail e-commerce overview

    • Preset node type: Zero load node

    image

  4. In the Create Node section of the configuration tab of the workflow, find the node type that you want to use, drag the node type to the canvas, and then release. In the Create Node dialog box, specify the node name and click Confirm.

    The following table describes the preset node names and types.

    Node type

    Node name

    Data Integration - Batch synchronization

    ods_mbr_user_info

    MaxCompute-MaxCompute SQL

    dim_ec_mbr_user_info

    MaxCompute-MaxCompute SQL

    dws_ec_mbr_cnt_nd

    Notebook

    ads_ec_kpi_report

    image

    image

  5. On the canvas, find two nodes for which you want to configure dependencies, and move the pointer over the middle of the lower edge of the rectangular box of one node. When the + icon appears, drag the connection line to connect to the other node and then release.

    image

    image

  6. Create the required nodes in sequence, configure the dependencies for the nodes, and then click Save.

    image

  7. After you save the configurations, you can change the layout mode of the nodes on the canvas in the top toolbar based on your business requirements.

    image

  8. In the right-side navigation pane of the configuration tab of the workflow, click Properties. On the Properties tab, configure the scheduling parameters and node dependencies for the workflow. In the Scheduling Parameters section of the tab, click Add Parameter. In the input box that appears, enter bizdate in the Parameter Name field and select $[yyyymmdd-1] from the Parameter Value drop-down list.

    image

  9. In the Scheduling Dependencies section of the tab, click Add Dependency. In the section that appears, enter ads_ec_ec360_gmv_kpi_overview in the Ancestor Object field and press Enter. Wait for the results to return. Select the ancestor objects that you want from the result list and click Add.

    image

  10. In the top toolbar, click Deploy. The Deployment tab appears in the lower-right corner. Click Start Deployment to Production Environment to the right of the Deployment Description field, and perform the check and confirmation operations based on the on-screen instructions in sequence.

    image

Step 3: Perform multi-engine collaborative development

Data Studio supports warehouse development for data synchronization nodes, nodes of dozens of compute engine types such as MaxCompute, Hologres, EMR, Flink, and ADB, and nodes developed by using notebooks and Python. Data Studio also supports complex scheduling configurations for these nodes. Data Studio provides the R&D mode of development-production environment isolation. In this experiment, a Flink SQL Streaming node is created.

  1. In the left-side navigation pane of the Data Studio page, click the image icon. The DATASTUDIO pane appears. In the DATASTUDIO pane, click Workspace Directories, click the image icon on the right, and then choose Create Node > Flink > Flink SQL Streaming. In the popover that appears, enter a node name and press the Enter key.

    Preset node name: ads_ec_page_visit_log

    image

  2. On the configuration tab of the node, paste the preset code of the Flink SQL Streaming node into the code editor.

    image

    Preset code of the Flink SQL Streaming node

    CREATE TEMPORARY VIEW log_ri_base
    AS 
    SELECT 
      visit_time
      ,substring(visit_time,1,8) as stat_date
      ,substring(visit_time,9,2) as stat_hour
      ,visitor_id
      ,item_id
      ,cate_id
      ,ext_pv
    FROM vvp_ec_ads.dws_log_all_itm_ipv_ri
    WHERE
      bc_type IN ('b', 'z')
      AND coalesce(cate_id, '') <> ''
      AND visitor_type = 'uid'
      and coalesce(item_id, '') <> ''
      AND substring(visit_time,1,8) >= '${bizdate}'
    ;
    
    
    CREATE TEMPORARY VIEW itm_log_day
    AS
    SELECT
      sum(ext_pv) as pv
      ,count(DISTINCT visitor_id) FILTER (WHERE ext_pv>0) as uv
      ,stat_date
      ,cate_id
      ,item_id
    FROM log_ri_base
    GROUP BY stat_date
      ,cate_id
      ,item_id
    ;
    
    
    CREATE TEMPORARY VIEW itm_log_hh_00
    AS
    SELECT
      sum(ext_pv) as pv
      ,count(DISTINCT visitor_id) FILTER (WHERE ext_pv>0) as uv
      ,stat_date
      ,stat_hour
      ,item_id
      ,cate_id
    FROM log_ri_base
    GROUP BY stat_date
      ,stat_hour
      ,cate_id
      ,item_id
    ;
    
    BEGIN STATEMENT SET;
    
    INSERT INTO vvp_ec_ads.ads_ec_log
    SELECT
      a.stat_date
      ,cast(a.item_id as varchar) as item_id
      ,a.cate_id
      ,b.cate_name
      ,cast(b.industry_id as varchar) as industry_id
      ,cast(b.xcat1_id as varchar) as xcat1_id
      ,cast(b.xcat2_id as varchar) as xcat2_id
      ,cast(b.xcat3_id as varchar) as xcat3_id
      ,cast(b.cate_level1_id as varchar) as cate_level1_id
      ,cast(b.cate_level2_id as varchar) as cate_level2_id
      ,cast(b.is_sw as varchar) as is_sw
      ,a.pv as mbr_ipv_1d
      ,a.uv as mbr_ipv_uv_1d
      ,DATE_FORMAT(CURRENT_TIMESTAMP,'yyyy-MM-dd HH:mm:ss') as log_gmt_modify
      ,DATE_FORMAT(CURRENT_TIMESTAMP,'yyyy-MM-dd HH:mm:ss') as gmt_modify
    FROM itm_log_day a
    JOIN ec.dim_tm_cate_360_2pt_ri FOR SYSTEM_TIME AS OF PROCTIME() AS b
    ON vvp_dt_rtcdm.DateAddOrSub(a.stat_date, -2) = b.stat_date 
        AND a.cate_id = b.cate_id
    ;
    
    -- Write data.
    INSERT INTO vvp_ec_ads.ads_ec_log_hh
    
    SELECT
      a.stat_date
      ,a.stat_hour
      ,cast(a.item_id as varchar) as item_id
      ,a.cate_id
      ,b.cate_name
      ,cast(b.industry_id as varchar) as industry_id
      ,cast(b.xcat1_id as varchar) as xcat1_id
      ,cast(b.xcat2_id as varchar) as xcat2_id
      ,cast(b.xcat3_id as varchar) as xcat3_id
      ,cast(b.cate_level1_id as varchar) as cate_level1_id
      ,cast(b.cate_level2_id as varchar) as cate_level2_id
      ,cast(b.is_sw as varchar) as is_sw
      ,a.pv as mbr_ipv_1h
      ,a.uv as mbr_ipv_uv_1h
      ,DATE_FORMAT(CURRENT_TIMESTAMP,'yyyy-MM-dd HH:mm:ss') as log_gmt_modify
      ,DATE_FORMAT(CURRENT_TIMESTAMP,'yyyy-MM-dd HH:mm:ss') as gmt_modify
    FROM itm_log_hh_00 a
    JOIN ec.dim_tm_cate_360_2pt_ri FOR SYSTEM_TIME AS OF PROCTIME() AS b
    ON vvp_ec_ads.DateAddOrSub(a.stat_date, -2) = b.stat_date 
        AND a.cate_id = b.cate_id
    ;
    
    END;
  3. In the right-side navigation pane of the configuration tab of the node, click Real-time Configurations. On the tab that appears, configure the Flink resource-related parameters, script parameters, and Flink runtime parameters. The following figure shows the parameter values.

    image

    Preset code in expert mode

    {
      "nodes": [
        {
          "profile": {
            "parallelism": 256,
            "maxParallelism": 32768,
            "minParallelism": 1,
            "group": "0"
          },
          "id": 1,
          "type": "StreamExecTableSourceScan",
          "desc": "Source: vvp_dt_rtcdm_dwd_tb_trd_ord_pay_nrt_ri[71980]"
        },
        {
          "profile": {
            "parallelism": 256,
            "maxParallelism": 32768,
            "minParallelism": 1,
            "group": "0"
          },
          "id": 2,
          "type": "StreamExecCalc",
          "desc": "Calc[71981]"
        },
        {
          "profile": {
            "parallelism": 256,
            "maxParallelism": 32768,
            "minParallelism": 1,
            "group": "0"
          },
          "id": 3,
          "type": "StreamExecLookupJoin",
          "desc": "LookupJoin[71982]"
        },
        {
          "profile": {
            "parallelism": 256,
            "maxParallelism": 32768,
            "minParallelism": 1,
            "group": "0"
          },
          "id": 4,
          "type": "StreamExecCalc",
          "desc": "Calc[71983]"
        },
        {
          "profile": {
            "parallelism": 256,
            "maxParallelism": 32768,
            "minParallelism": 1,
            "group": "1"
          },
          "id": 6,
          "state": [
            {
              "userDefined": false,
              "name": "groupAggregateState",
              "index": 0,
              "ttl": "36 h"
            }
          ],
          "type": "StreamExecGroupAggregate",
          "desc": "GroupAggregate[71985]"
        },
        {
          "profile": {
            "parallelism": 256,
            "maxParallelism": 32768,
            "minParallelism": 1,
            "group": "1"
          },
          "id": 7,
          "type": "StreamExecCalc",
          "desc": "Calc[71986]"
        },
        {
          "profile": {
            "parallelism": 256,
            "maxParallelism": 32768,
            "minParallelism": 1,
            "group": "1"
          },
          "id": 8,
          "type": "StreamExecSink",
          "desc": "ConstraintEnforcer[71987]"
        },
        {
          "profile": {
            "parallelism": 256,
            "maxParallelism": 32768,
            "minParallelism": 1,
            "group": "2"
          },
          "id": 10,
          "state": [
            {
              "userDefined": false,
              "name": "sinkMaterializeState",
              "index": 0,
              "ttl": "36 h"
            }
          ],
          "type": "StreamExecSink",
          "desc": "SinkMaterializer[71987]"
        },
        {
          "profile": {
            "parallelism": 256,
            "maxParallelism": 32768,
            "minParallelism": 1,
            "group": "2"
          },
          "id": 11,
          "type": "StreamExecSink",
          "desc": "Sink: vvp_dt_ads_tb_dev_ads_tb_idec_seckill_cate_bc_trd_flow_htr_000[71987]"
        }
      ],
      "vertices": {
        "2d95a2974e3b3137fd533ecfd3490bc5": [
          10,
          11
        ],
        "717c7b8afebbfb7137f6f0f99beb2a94": [
          1,
          2,
          3,
          4
        ],
        "44b79c13fdb45883c7f21ee510155f4d": [
          6,
          7,
          8
        ]
      },
      "edges": [
        {
          "mode": "PIPELINED",
          "source": 1,
          "strategy": "FORWARD",
          "target": 2
        },
        {
          "mode": "PIPELINED",
          "source": 2,
          "strategy": "FORWARD",
          "target": 3
        },
        {
          "mode": "PIPELINED",
          "source": 3,
          "strategy": "FORWARD",
          "target": 4
        },
        {
          "mode": "PIPELINED",
          "source": 4,
          "strategy": "HASH",
          "target": 6
        },
        {
          "mode": "PIPELINED",
          "source": 6,
          "strategy": "FORWARD",
          "target": 7
        },
        {
          "mode": "PIPELINED",
          "source": 7,
          "strategy": "FORWARD",
          "target": 8
        },
        {
          "mode": "PIPELINED",
          "source": 8,
          "strategy": "HASH",
          "target": 10
        },
        {
          "mode": "PIPELINED",
          "source": 10,
          "strategy": "FORWARD",
          "target": 11
        }
      ],
      "ssgProfiles": [
        {
          "managed": {},
          "name": "0",
          "cpu": 0.25,
          "offHeap": "32 mb",
          "heap": "992 mb",
          "extended": {}
        },
        {
          "managed": {
            "STATE_BACKEND": "512 mb"
          },
          "name": "1",
          "cpu": 0.25,
          "offHeap": "32 mb",
          "heap": "480 mb",
          "extended": {}
        },
        {
          "managed": {
            "STATE_BACKEND": "512 mb"
          },
          "name": "2",
          "cpu": 0.25,
          "offHeap": "32 mb",
          "heap": "480 mb",
          "extended": {}
        }
      ]
    

    Preset other configurations

    blob.fetch.backlog: 1000
    taskmanager.debug.memory.log-interval: 5000
  4. After you configure the parameters on the Real-time Configurations tab, click Save and then Deploy in the top toolbar. The Deployment tab appears in the lower-right corner. Click Start Deployment to Production Environment to the right of the Deployment Description field, and perform the check and confirmation operations based on the on-screen instructions in sequence.

    image

Step 4: Enter the personal development environment

In a personal development environment, the following items and features are supported: custom container images, connection to File Storage NAS (NAS) and Git, and Python-based programming and notebooks.

In the top navigation bar of the Data Studio page, click Select Personal development environment and then select the personal development environment that you want to enter.

image

Step 5: Write and debug Python code

DataWorks is deeply integrated with DSW. Data Studio supports the writing, debugging, scheduling, and running of Python code in a personal development environment.

Important

You can perform operations in this step only after you complete Step 4: Enter the personal development environment.

  1. In the selected personal development environment on the Data Studio page, click workspace under Personal Directory and click the image icon. An unnamed file is added to the list. Enter the preset file name, press the Enter key, and wait for the file to be generated.

    Preset file name: ec_item_rec.py

    image

  2. In the code editor on the configuration tab of the Python file, enter the preset Python code. In the top toolbar, select Run Python File. On the TERMINAL tab, which is in the lower part of the configuration tab, query the running result.

    image

    image

    Preset Python code

    import pandas as pd
    from surprise import Dataset, Reader, SVD
    from surprise.model_selection import train_test_split
    from surprise import accuracy
    
    # Create sample data.
    data_dict = {
        'user_id': [1, 1, 1, 2, 2, 2, 3, 3, 4],
        'item_id': [101, 102, 103, 101, 104, 105, 102, 105, 101],
        'rating': [5, 3, 4, 2, 4, 5, 4, 5, 3]
    }
    
    # Convert data entries into a DataFrame.
    df = pd.DataFrame(data_dict)
    
    # Prepare a dataset by using the Surprise library.
    reader = Reader(rating_scale=(1, 5))
    data = Dataset.load_from_df(df[['user_id', 'item_id', 'rating']], reader)
    
    # Split the dataset into a training set and a test set.
    trainset, testset = train_test_split(data, test_size=0.2)
    
    # Recommend products based on the Singular Value Decomposition (SVD) algorithm.
    model = SVD()
    model.fit(trainset)
    
    # Make predictions.
    predictions = model.test(testset)
    
    # Calculate the root mean square error (RMSE).
    rmse = accuracy.rmse(predictions)
    print(f'RMSE: {rmse:.2f}')
    
    # Obtain the products that are recommended to a user.
    def get_recommendations(user_id, model, all_items, n=3):
        item_ids = all_items['item_id'].unique()
        user_item_col = all_items[(all_items['user_id'] == user_id)]['item_id']
        unseen_items = [item for item in item_ids if item not in user_item_col.values]
    
        # Predict the scores of unseen products.
        predictions = []
        for item in unseen_items:
            pred = model.predict(user_id, item)
            predictions.append((item, pred.est))
    
        # Sort products by the predicted scores.
        predictions.sort(key=lambda x: x[1], reverse=True)
        return predictions[:n]
    
    # Obtain the product recommendations.
    all_items = df
    user_id = 1  # The ID of the user to whom the products are recommended.
    recommendations = get_recommendations(user_id, model, all_items)
    
    print(f'The products that are recommended to {user_id}:')
    for item_id, score in recommendations:
        print(f'Product ID: {item_id}, Predicted score: {score:.2f}')

    Python environment installation

    pip install pandas scikit-surprise
  3. In the top toolbar of the configuration tab of the Python file, select Debug Python File. Move the pointer over a code line number in the code editor. A red dot appears. Click the red dot to add a breakpoint. In the upper-left part above the DATASTUDIO pane, click image to debug the code.

    image

Step 6: Explore data based on notebooks

When you explore data based on notebooks, the related operations are performed in the personal development environment. Therefore, you must complete Step 4: Enter the personal development environment before you perform operations in this step.

Create a notebook

  1. Go to the Data Studio page.

  2. In the Personal Directory section, right-click the desired folder and select Create Notebook.

  3. An unnamed file is added to the list. Enter the notebook name, and press the Enter key or click on the blank space to make the notebook name to take effect.

  4. Click the notebook name in the Personal Directory section. The configuration tab of the notebook appears.

Use the notebook

Note

The operations in the Use the notebook section are independent operations without a specific execution order. You can perform the operations based on your business requirements.

  • Multi-engine development in the notebook

    EMR Spark SQL

    1. On the configuration tab of the notebook, click image to add an SQL cell.

    2. In the SQL cell, enter the following statement to query the dim_ec_mbr_user_info table:

      dim_ec_mbr_user_info

      -- Note: Query the basic information about members on an e-commerce platform based on the member information source table and the region source table. 
      USE openlake_win.default;
      SELECT  user.user_id AS user_id
              ,user.nick AS nick
              ,user.gmt_create AS gmt_modified
              ,user.gmt_modified AS gmt_create
              ,user.reg_fullname AS reg_fullname
              ,user.reg_mobile_phone AS reg_mobile_phone
              ,user.reg_email AS reg_email
              ,user.reg_gender AS reg_gender
              ,user.reg_gender_name AS reg_gender_name
              ,user.reg_birthdate AS reg_birthdate
              ,user.reg_address AS reg_address
              ,user.reg_nation_id AS reg_nation_id
              ,user.reg_nation_name AS reg_nation_name
              ,user.reg_prov_id AS reg_prov_id
              ,area_prov.name AS reg_prov_name
              ,user.reg_city_id AS reg_city_id
              ,area_city.name AS reg_city_name
              ,user.user_regip AS user_regip
              ,user.id_card_type AS id_card_type
              ,user.id_card_type_name AS id_card_type_name
              ,user.id_card_number AS id_card_number
              ,null as id_gender
              ,null as id_bday
              ,(2024 - CAST(SUBSTR(user.id_card_number,7,4) AS INT)) AS id_age
              ,user.user_regdate AS user_regdate
              ,user.user_active_type AS user_active_type
              ,user.user_active_name AS user_active_name
              ,user.user_active_time AS user_active_time
              ,user.vip_level AS vip_level
              ,user.vip_level_name AS vip_level_name
              ,user.is_delete AS is_delete
      FROM    (
                  SELECT  id    -- The primary key
                          ,gmt_create    -- The creation time
                          ,gmt_modified    -- The modification time
                          ,user_id    -- The user ID of a member
                          ,nick    -- The nickname of a member
                          ,reg_fullname    -- The real name of a person is used if individual verification is performed and an enterprise name is used if enterprise verification is performed.
                          ,reg_mobile_phone    -- The mobile phone number entered during registration
                          ,reg_email    -- The email address entered during registration (modifiable by the user)
                          ,reg_gender    -- The gender entered during registration (F represents female and M represents male. If neither one is entered, the gender is unknown, which indicates that the person wants to keep the gender confidential.)
                          ,CASE    WHEN reg_gender='F' THEN 'Female'
                                   WHEN reg_gender='M' THEN 'Male' 
                                   ELSE 'Unknown' 
                           END AS reg_gender_name    -- The gender entered during registration (F represents female and M represents male. If neither one is entered, the gender is unknown, which indicates that the person wants to keep the gender confidential.)
                          ,reg_birthdate    -- The birthday entered during registration (modifiable by the user)
                          ,reg_address    -- The address entered during registration (modifiable by the user)
                          ,reg_nation_id    -- The country ID entered during registration (left empty)
                          ,CASE    WHEN reg_nation_id='cn' THEN 'China' 
                                   ELSE 'Areas outside China' 
                           END AS reg_nation_name
                          ,reg_prov_id    -- The province ID entered during registration
                          ,reg_city_id    -- The city ID entered during registration
                          ,user_regip    -- The IP address entered during registration
                          ,id_card_type    -- The certificate type used for membership authentication. 0: unknown; 1: ID card number; 2: business license number
                          ,CASE    WHEN id_card_type=0 THEN 'Unknown'
                                   WHEN id_card_type=1 THEN 'ID card number'
                                   WHEN id_card_type=2 THEN 'Business license number' 
                                   ELSE 'Exception' 
                           END AS id_card_type_name
                          ,id_card_number    -- If individual verification is performed, it indicates the ID card number. If enterprise verification is performed, it indicates the business license number. If no verification is performed, accuracy cannot be guaranteed.
                          ,user_regdate    -- The registration time
                          ,user_active_type    -- The user activation method
                          ,CASE    WHEN user_active_type='email' THEN 'Email'
                                   WHEN user_active_type='mobile_phone' THEN 'Mobile phone' 
                                   ELSE 'Exception' 
                           END AS user_active_name    -- The user activation method
                          ,user_active_time    -- The activation time
                          ,cast(vip_level AS BIGINT) AS vip_level    -- The VIP rank
                          ,CASE    WHEN vip_level>0 AND vip_level<=3 THEN 'Beginner'
                                   WHEN vip_level>3 AND vip_level<=6 THEN 'Intermediate'
                                   WHEN vip_level>6 AND vip_level<=10 THEN 'Advanced' 
                                   WHEN vip_level>10  THEN 'Expert' 
                           ELSE 'Exception'
                           END  AS vip_level_name
                          ,is_delete    -- Delete or not
                  FROM    ods_mbr_user_info
              ) AS USER
      LEFT JOIN (
                    SELECT  id,pid,name,shortname,longitude,latitude,level,sort
                    FROM    ods_t_area
                ) AS area_prov
      ON      user.reg_prov_id = area_prov.id 
      LEFT JOIN    (
                  SELECT  id,pid,name,shortname,longitude,latitude,level,sort
                  FROM    ods_t_area
              ) AS area_city
      ON      user.reg_city_id = area_city.id
      ;
    3. In the lower-right corner of the SQL cell, select EMR Spark SQL and select openlake_serverless_spark as the computing resource.

      image

    4. Click Run. Wait until the running is complete and view the data result.

    StarRocks SQL

    1. On the configuration tab of the notebook, click image to add an SQL cell.

    2. In the SQL cell, enter the following statement to query the dws_ec_trd_cate_commodity_gmv_kpi_fy table:

      dws_ec_trd_cate_commodity_gmv_kpi_fy

      -- Note: Query data metrics, such as "Fiscal year_Amount of successful order payments" and "Fiscal year_Transaction amount completion rate", based on the fact table of transaction orders and the dimension table of the basic information about products.
      USE `openlake_win`.`default`;
      select   t1.cate_id, t1.cate_name, t1.commodity_id, t1.commodity_name, round(10*sum(t1.total_fee),4) as pay_ord_amt_fy, round((10*sum(t1.total_fee)/30000000),4) as kpi_gmv_rate_fy
      from    (
                  select  DATE_FORMAT(a.gmt_create,'yyyymmdd') as stat_date
                          ,a.sub_order_id, a.buyer_id, a.item_id, a.biz_type, a.pay_status, a.total_fee/100 as total_fee, b.cate_id, b.cate_name, b.commodity_id, b.commodity_name 
                  from    `openlake_win`.`default`.dwd_ec_trd_create_ord_di a
                  left outer join (
                                      select  distinct item_id, cate_id, cate_name, commodity_id, commodity_name, shop_id, shop_nick
                                      from    `openlake_win`.`default`.dim_ec_itm_item_info
                                  ) b
                  on      a.item_id = b.item_id
                  and     a.shop_id = b.shop_id
              ) t1
      where   t1.pay_status in ('2')
      and     t1.biz_type in ('2','3','4')
      group by   t1.cate_id, t1.cate_name, t1.commodity_id, t1.commodity_name
      ;
    3. In the lower-right corner of the SQL cell, select StarRocks SQL and select openlake_starrocks as the computing resource.

      image

    4. Click Run. Wait until the running is complete and view the data result.

    Hologres SQL

    1. On the configuration tab of the notebook, click image to add an SQL cell.

    2. In the SQL cell, enter the following statement to query the dws_ec_mbr_cnt_std table:

      dws_ec_mbr_cnt_std

      -- Note: The data in the dimension table of the basic information about members is calculated and converted into data metrics such as "Number of existing members". The data can also be used to obtain the statistics on the "Historical cumulative membership count as of the current date_cube" metric.
      SELECT    IF(grouping(reg_prov_id) = 0, reg_prov_id, '-9999') as reg_prov_id
              , IF(grouping(reg_prov_name) = 0, reg_prov_name, 'All') as reg_prov_name
              , IF(grouping(reg_gender) = 0, reg_gender, '-9999') as reg_gender
              , IF(grouping(reg_gender_name) = 0, reg_gender_name, 'All') as reg_gender_name
              , IF(grouping(age_tag) = 0, age_tag, '-9999') as age_tag
              , IF(grouping(user_active_type) = 0, user_active_type, '-9999') as user_active_type
              , IF(grouping(user_active_name) = 0, user_active_name, 'All') as user_active_name
              , IF(grouping(vip_level) = 0, vip_level, '-9999') as vip_level
              , IF(grouping(vip_level_name) = 0, vip_level_name, 'All') as vip_level_name 
              , count(distinct user_id) as mbr_cnt
      from (
          select    reg_prov_id
                  , reg_prov_name
                  , reg_gender
                  , reg_gender_name
                  , case when cast(substr(reg_birthdate,1,4) as int)>=2010 and cast(substr(reg_birthdate,1,4) as int)<2020 then 'After 2010' 
                          when cast(substr(reg_birthdate,1,4) as int)>=2000 and cast(substr(reg_birthdate,1,4) as int)<2010 then 'After 2000' 
                          when cast(substr(reg_birthdate,1,4) as int)>=1990 and cast(substr(reg_birthdate,1,4) as int)<2000 then 'After 1990' 
                          when cast(substr(reg_birthdate,1,4) as int)>=1980 and cast(substr(reg_birthdate,1,4) as int)<1990 then 'After 1980' 
                          when cast(substr(reg_birthdate,1,4) as int)>=1970 and cast(substr(reg_birthdate,1,4) as int)<1980 then 'After 1970' 
                          else 'Other' 
                    end as age_tag
                  , user_active_type
                  , user_active_name
                  , vip_level
                  , vip_level_name 
                  , user_id
          from    openlake_win.default.dim_ec_mbr_user_info
      ) _main       
      group by 
      grouping sets(
          (reg_prov_id, reg_prov_name)
         ,(reg_gender, reg_gender_name)
         ,(age_tag)
         ,(user_active_type, user_active_name)
         ,(vip_level, vip_level_name)
         ,()
      );
    3. In the lower-right corner of the SQL cell, select Hologres SQL and select openlake_hologres as the computing resource.

      image

    4. Click Run. Wait until the running is complete and view the data result.

    MaxCompute SQL

    1. On the configuration tab of the notebook, click image to add an SQL cell.

    2. In the SQL cell, enter the following statement to query the dws_ec_mbr_cnt_std table:

      dws_ec_mbr_cnt_std

      -- Note: Query the "Historical cumulative membership count as of the current date_cube" table at the light aggregation layer.
      set odps.task.major.version=flighting;
      set odps.namespace.schema=true;
      set odps.sql.allow.namespace.schema=true;
      set odps.service.mode=only;
      set odps.sql.unstructured.data.split.size=1;
      
      SELECT * 
      FROM openlake_win.default.dws_ec_mbr_cnt_std 
      LIMIT 200;
    3. In the lower-right corner of the SQL cell, select MaxCompute SQL and select openlake_maxcompute as the computing resource.

      image

    4. Click Run. Wait until the running is complete and view the data result.

  • Interactive data in the notebook
    1. On the configuration tab of the notebook, click image to add a Python cell.

    2. In the upper-right corner of the Python cell, click image to start DataWorks Copilot, an intelligent programming assistant.

    3. In the input box that appears, enter the following requirements to use ipywidgets to generate a widget that can be used to query the ages of members.

      Note

      Requirement description: Use Python to generate a slider widget of member age. The value range is from 1 to 100 and the default value is 20. Monitor the change of the value in real time and save the value to the query_age global variable.

    4. Check the Python code generated by DataWorks Copilot and click Accept.

      image

    5. Click the Run icon in the Python cell and wait until the running is complete. View the generation of the widget. The widget is generated by running the code generated by DataWorks Copilot or the preset code. You can slide to select the desired age in the widget.

      Sample code for generating the widget by using ipywidgets

      import ipywidgets as widgets
      
      # Create a slider widget.
      slider = widgets.IntSlider(
          min = 1,
          max = 100,
          value = 20,
          description = 'Age:',
      )
      
      # Define the query_age global variable.
      query_age = None
      
      
      # Define a function to control the change of the slider value.
      def on_slider_change(change):
          global query_age
          query_age = change.new
      
      # Associate the function with the slider value changing event.
      slider.observe(on_slider_change,names='value')
      
      # Display the slider.
      display(slider)

      image

    6. On the configuration tab of the notebook, click image to add an SQL cell.

    7. In the SQL cell, enter the following statement. The statement contains the member age variable ${query_age} that is defined in Python.

      SELECT * FROM openlake_win.default.dim_ec_mbr_user_info
      WHERE CAST(id_age AS INT) >= ${query_age};
    8. In the lower-right corner of the SQL cell, select Hologres SQL and select openlake_hologres as the computing resource.

      image

    9. Click Run. Wait until the running is complete and view the data result.

    10. In the running result, click image to generate a chart.

  • Model development and training in the notebook
    1. On the configuration tab of the notebook, click image to add an SQL cell.

    2. In the SQL cell, enter the following statement to query the ods_trade_order table:

      SELECT * FROM openlake_win.default.ods_trade_order;
    3. Write SQL query results to a DataFrame variable. Click the place where df is located to specify a custom name for the DataFrame variable, such as df_ml.

      image

    4. Click the Run icon in the SQL cell, wait until the running is complete, and view the data result.

    5. On the configuration tab of the notebook, click image to add a Python cell.

    6. In the Python cell, enter the following statement to cleanse and process data by using Pandas, and store the data in a new variable df_ml_clean of DataFrame:

      import pandas as pd
      
      def clean_data(df_ml):
          # Generate a new column: Estimated total order amount = Product unit price × Product quantity.
          df_ml['predict_total_fee'] = df_ml['item_price'].astype(float).values * df_ml['buy_amount'].astype(float).values
          # Rename the total_fee column to actual_total_fee.
          df_ml = df_ml.rename(columns={'total_fee': 'actual_total_fee'})
          return df_ml
      
      df_ml_clean = clean_data(df_ml.copy())
      df_ml_clean.head()
    7. Click the Run icon in the Python cell, wait until the running is complete, and view the data cleansing result.

    8. On the configuration tab of the notebook, click image to add a Python cell again.

    9. In the Python cell, enter the following statement to build a linear regression machine learning model and perform training and testing.

      import pandas as pd  
      from sklearn.model_selection import train_test_split  
      from sklearn.linear_model import LinearRegression  
      from sklearn.metrics import mean_squared_error  
        
      # Obtain the product price and total costs.
      X = df_ml_clean[['predict_total_fee']].values  
      y = df_ml_clean['actual_total_fee'].astype(float).values  
      
      # Prepare data.  
      X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.05, random_state=42)  
      
      # Create and train a model.  
      model = LinearRegression()  
      model.fit(X_train, y_train)  
        
      # Perform prediction and evaluation.  
      y_pred = model.predict(X_test)  
      for index, (x_t, y_pre, y_t) in enumerate(zip(X_test, y_pred, y_test)):
          print("[{:>2}] input: {:<10} prediction:{:<10} gt: {:<10}".format(str(index+1), f"{x_t[0]:.3f}", f"{y_pre:.3f}", f"{y_t:.3f}"))
      
      # Calculate the mean squared error (MSE).
      mse = mean_squared_error(y_test, y_pred)  
      print("MSE:", mse)
    10. Click Run, wait until the running is complete, and view the test result of the model training.