All Products
Search
Document Center

:DataWorks OpenLake one-stop intelligent data lakehouse development

Last Updated:Oct 28, 2025

In this experiment, you will walk through a retail e-commerce data development and analysis scenario in an OpenLake data lakehouse environment. You will use DataWorks for multi-engine collaborative development, visual workflow orchestration, and data catalog management. You will also practice Python programming and debugging and use a Notebook for AI-powered interactive data exploration and analysis.

Background

Introduction to DataWorks

DataWorks is an intelligent platform for data lakehouse development and governance. It is built on 15 years of Alibaba's big data construction methodology. It is deeply integrated with dozens of Alibaba Cloud big data and AI computing services, such as MaxCompute, E-MapReduce, Hologres, Flink, and PAI. DataWorks provides intelligent extract, transform, and load (ETL) development, data analysis, and proactive data asset governance services for data warehouses, data lakes, and OpenLake data lakehouse architectures. This helps you manage the entire "Data+AI" lifecycle. Since 2009, DataWorks has continuously productized Alibaba's data architecture. It serves various industries, including government, finance, retail, the Internet, automotive, and manufacturing. Tens of thousands of customers use DataWorks to drive digital transformation and create value.

Introduction to DataWorks Copilot

DataWorks Copilot is your intelligent assistant in DataWorks. In DataWorks, you can choose to use the DataWorks default model, Qwen3-235B-A22B, DeepSeek-R2-0528, or the Qwen3-Coder large model to perform Copilot operations. With the deep inference capabilities of DeepSeek-R2, DataWorks Copilot helps you perform complex operations such as SQL code generation, optimization, and testing through natural language interaction. This significantly improves ETL development and data analysis efficiency.

Introduction to DataWorks Notebook

DataWorks Notebook is an intelligent, interactive tool for data development and analysis. It supports SQL or Python analysis across multiple data engines. You can run or debug code instantly and view visualized data results. A DataWorks Notebook can also be orchestrated with other task nodes into a workflow and submitted to the scheduling system for execution. This lets you flexibly implement complex business scenarios.

Usage notes

  • The DataWorks Copilot public preview is subject to regional and version limitations. For more information, see Usage notes.

  • To use Python and Notebooks in DataStudio, you must first switch to the personal developer environment.

Limitations

  • OpenLake only supports Data Lake Formation (DLF) 2.0.

  • The data catalog only supports Data Lake Formation (DLF) 2.0.

  • The Qwen3-235B-A22B/DeepSeek-R2 model is available in the following regions: China (Hangzhou), China (Shanghai), China (Beijing), China (Zhangjiakou), China (Shenzhen), and China (Chengdu).

  • The Qwen3-Coder model is available in China (Hangzhou), China (Shanghai), China (Beijing), China (Zhangjiakou), China (Ulanqab), China (Shenzhen), and China (Chengdu).

Prerequisites

  1. You have prepared an Alibaba Cloud account or a RAM user.

  2. You have created a workspace.

    Note

    Select Join the public preview of Data Development (DataStudio) (New).

  3. You have attached a computing resource.

Procedure

Step 1: Manage the data catalog

The data catalog management feature of the data lakehouse lets you manage and create data catalogs for services such as DLF, MaxCompute, and Hologres.

  1. In DataStudio, click the image icon in the left-side menu to open Data Catalog. In the navigation pane, find the metadata type that you want to manage and click Add Project. The name of this button may vary depending on the metadata type. This topic uses MaxCompute as an example.

    You can add data sources from your DataWorks workspace. You can also select a MaxCompute project for which you have permissions on the MaxCompute-Project tab.

    image

  2. After you add the project, it appears under the corresponding metadata type. Click the project name to go to the data catalog details page.

  3. On the data catalog details page, select a schema and click any table name to view the table details.

  4. The data catalog lets you create tables visually.

    Expand the data catalog to the Table level of the specified schema. Click the image icon on the right to open the Create Table page.

    image

  5. On the Create Table page, you can create a table in the following ways:

    • In area ①, enter the Table Name and Field Information.

    • In area ②, you can directly enter the DDL statement to create the table.

    image

  6. Click the Publish button at the top of the page to create the table.

Step 2: Orchestrate a workflow

A workflow lets you orchestrate various types of data development nodes by dragging and dropping them based on your business logic. You do not need to configure common parameters, such as scheduling time, for each node. This helps you easily manage complex task projects.

  1. In DataStudio, click the image icon in the primary menu on the left to open Data Development. In the navigation pane on the left, find Project Folder, click the image icon next to it, and select New Workflow.

  2. Enter a Name for the workflow and click OK to open the workflow editor.

  3. In the workflow editor, click Drag Or Click To Add A Node on the canvas. In the Add Node dialog box, set Node Type to Zero Load Node, enter a custom Node Name, and then click Confirm.

  4. From the list of node types on the left, find the required node type and drag it onto the canvas. In the Add Node dialog box, enter a Node Name and click Confirm.

    image

  5. On the canvas, find the two nodes for which you want to create a dependency. Hover your mouse over the middle of the bottom edge of one node. When the pointer changes to a +, drag the arrow to the other node and release the mouse. After you set the dependency as shown in the figure, click Save on the top toolbar.

    image

  6. After clicking Save, you can adjust the canvas layout as needed.image

  7. On the right side of the workflow canvas, click Scheduling Configuration. Use the Scheduling Configuration panel to configure the scheduling parameters and node dependencies for the workflow. In the Scheduling Parameters section, click Add Parameter. In the parameter name field, enter bizdate. From the parameter value drop-down list, select $[yyyymmdd-1].

    image

  8. Click Use Workspace Root Node to set this node as the upstream dependency for the workflow.

    image

  9. Click Publish above the workflow canvas. The Publish Content panel appears in the lower-right corner. In the panel, click Start Publishing To Production and follow the prompts to confirm.

    image

Step 3: Use multi-engine collaborative development

DataStudio supports data warehouse development for dozens of node types across different engines, such as Data Integration, MaxCompute, Hologres, EMR, Flink, Python, Notebook, and ADB. It supports complex scheduling dependencies and provides a development model that isolates the development environment from the production environment. This experiment uses the creation of a Flink SQL Streaming node as an example.

  1. In DataStudio, click the image icon in the navigation pane on the left to open the Data Development page. In the navigation pane, find Project Folder and click the image icon next to it. In the cascading menu, click Flink SQL Streaming to open the node editor. Before the editor opens, enter a Node Name and press Enter.

    Preset node name: ads_ec_page_visit_log.

    image

  2. In the node editor, paste the preset Flink SQL Stream code into the code editor.

    image

    Preset Flink SQL Stream code

    CREATE TEMPORARY VIEW log_ri_base
    AS 
    SELECT 
      visit_time
      ,substring(visit_time,1,8) as stat_date
      ,substring(visit_time,9,2) as stat_hour
      ,visitor_id
      ,item_id
      ,cate_id
      ,ext_pv
    FROM vvp_ec_ads.dws_log_all_itm_ipv_ri
    WHERE
      bc_type IN ('b', 'z')
      AND coalesce(cate_id, '') <> ''
      AND visitor_type = 'uid'
      and coalesce(item_id, '') <> ''
      AND substring(visit_time,1,8) >= '${bizdate}'
    ;
    
    
    CREATE TEMPORARY VIEW itm_log_day
    AS
    SELECT
      sum(ext_pv) as pv
      ,count(DISTINCT visitor_id) FILTER (WHERE ext_pv>0) as uv
      ,stat_date
      ,cate_id
      ,item_id
    FROM log_ri_base
    GROUP BY stat_date
      ,cate_id
      ,item_id
    ;
    
    
    CREATE TEMPORARY VIEW itm_log_hh_00
    AS
    SELECT
      sum(ext_pv) as pv
      ,count(DISTINCT visitor_id) FILTER (WHERE ext_pv>0) as uv
      ,stat_date
      ,stat_hour
      ,item_id
      ,cate_id
    FROM log_ri_base
    GROUP BY stat_date
      ,stat_hour
      ,cate_id
      ,item_id
    ;
    
    BEGIN STATEMENT SET;
    
    INSERT INTO vvp_ec_ads.ads_ec_log
    SELECT
      a.stat_date
      ,cast(a.item_id as varchar) as item_id
      ,a.cate_id
      ,b.cate_name
      ,cast(b.industry_id as varchar) as industry_id
      ,cast(b.xcat1_id as varchar) as xcat1_id
      ,cast(b.xcat2_id as varchar) as xcat2_id
      ,cast(b.xcat3_id as varchar) as xcat3_id
      ,cast(b.cate_level1_id as varchar) as cate_level1_id
      ,cast(b.cate_level2_id as varchar) as cate_level2_id
      ,cast(b.is_sw as varchar) as is_sw
      ,a.pv as mbr_ipv_1d
      ,a.uv as mbr_ipv_uv_1d
      ,DATE_FORMAT(CURRENT_TIMESTAMP,'yyyy-MM-dd HH:mm:ss') as log_gmt_modify
      ,DATE_FORMAT(CURRENT_TIMESTAMP,'yyyy-MM-dd HH:mm:ss') as gmt_modify
    FROM itm_log_day a
    JOIN ec.dim_tm_cate_360_2pt_ri FOR SYSTEM_TIME AS OF PROCTIME() AS b
    ON vvp_dt_rtcdm.DateAddOrSub(a.stat_date, -2) = b.stat_date 
        AND a.cate_id = b.cate_id
    ;
    
    --Write
    INSERT INTO vvp_ec_ads.ads_ec_log_hh
    
    SELECT
      a.stat_date
      ,a.stat_hour
      ,cast(a.item_id as varchar) as item_id
      ,a.cate_id
      ,b.cate_name
      ,cast(b.industry_id as varchar) as industry_id
      ,cast(b.xcat1_id as varchar) as xcat1_id
      ,cast(b.xcat2_id as varchar) as xcat2_id
      ,cast(b.xcat3_id as varchar) as xcat3_id
      ,cast(b.cate_level1_id as varchar) as cate_level1_id
      ,cast(b.cate_level2_id as varchar) as cate_level2_id
      ,cast(b.is_sw as varchar) as is_sw
      ,a.pv as mbr_ipv_1h
      ,a.uv as mbr_ipv_uv_1h
      ,DATE_FORMAT(CURRENT_TIMESTAMP,'yyyy-MM-dd HH:mm:ss') as log_gmt_modify
      ,DATE_FORMAT(CURRENT_TIMESTAMP,'yyyy-MM-dd HH:mm:ss') as gmt_modify
    FROM itm_log_hh_00 a
    JOIN ec.dim_tm_cate_360_2pt_ri FOR SYSTEM_TIME AS OF PROCTIME() AS b
    ON vvp_ec_ads.DateAddOrSub(a.stat_date, -2) = b.stat_date 
        AND a.cate_id = b.cate_id
    ;
    
    END;
  3. In the node editor, click Real-time Configuration on the right side of the code editor to configure the Flink Resource Information, Script Parameters, and Flink Runtime Parameters.

    image

    Preset Flink SQL Stream real-time configuration - Expert mode code

    {
      "nodes": [
        {
          "profile": {
            "parallelism": 256,
            "maxParallelism": 32768,
            "minParallelism": 1,
            "group": "0"
          },
          "id": 1,
          "type": "StreamExecTableSourceScan",
          "desc": "Source: vvp_dt_rtcdm_dwd_tb_trd_ord_pay_nrt_ri[71980]"
        },
        {
          "profile": {
            "parallelism": 256,
            "maxParallelism": 32768,
            "minParallelism": 1,
            "group": "0"
          },
          "id": 2,
          "type": "StreamExecCalc",
          "desc": "Calc[71981]"
        },
        {
          "profile": {
            "parallelism": 256,
            "maxParallelism": 32768,
            "minParallelism": 1,
            "group": "0"
          },
          "id": 3,
          "type": "StreamExecLookupJoin",
          "desc": "LookupJoin[71982]"
        },
        {
          "profile": {
            "parallelism": 256,
            "maxParallelism": 32768,
            "minParallelism": 1,
            "group": "0"
          },
          "id": 4,
          "type": "StreamExecCalc",
          "desc": "Calc[71983]"
        },
        {
          "profile": {
            "parallelism": 256,
            "maxParallelism": 32768,
            "minParallelism": 1,
            "group": "1"
          },
          "id": 6,
          "state": [
            {
              "userDefined": false,
              "name": "groupAggregateState",
              "index": 0,
              "ttl": "36 h"
            }
          ],
          "type": "StreamExecGroupAggregate",
          "desc": "GroupAggregate[71985]"
        },
        {
          "profile": {
            "parallelism": 256,
            "maxParallelism": 32768,
            "minParallelism": 1,
            "group": "1"
          },
          "id": 7,
          "type": "StreamExecCalc",
          "desc": "Calc[71986]"
        },
        {
          "profile": {
            "parallelism": 256,
            "maxParallelism": 32768,
            "minParallelism": 1,
            "group": "1"
          },
          "id": 8,
          "type": "StreamExecSink",
          "desc": "ConstraintEnforcer[71987]"
        },
        {
          "profile": {
            "parallelism": 256,
            "maxParallelism": 32768,
            "minParallelism": 1,
            "group": "2"
          },
          "id": 10,
          "state": [
            {
              "userDefined": false,
              "name": "sinkMaterializeState",
              "index": 0,
              "ttl": "36 h"
            }
          ],
          "type": "StreamExecSink",
          "desc": "SinkMaterializer[71987]"
        },
        {
          "profile": {
            "parallelism": 256,
            "maxParallelism": 32768,
            "minParallelism": 1,
            "group": "2"
          },
          "id": 11,
          "type": "StreamExecSink",
          "desc": "Sink: vvp_dt_ads_tb_dev_ads_tb_idec_seckill_cate_bc_trd_flow_htr_000[71987]"
        }
      ],
      "vertices": {
        "2d95a2974e3b3137fd533ecfd3490bc5": [
          10,
          11
        ],
        "717c7b8afebbfb7137f6f0f99beb2a94": [
          1,
          2,
          3,
          4
        ],
        "44b79c13fdb45883c7f21ee510155f4d": [
          6,
          7,
          8
        ]
      },
      "edges": [
        {
          "mode": "PIPELINED",
          "source": 1,
          "strategy": "FORWARD",
          "target": 2
        },
        {
          "mode": "PIPELINED",
          "source": 2,
          "strategy": "FORWARD",
          "target": 3
        },
        {
          "mode": "PIPELINED",
          "source": 3,
          "strategy": "FORWARD",
          "target": 4
        },
        {
          "mode": "PIPELINED",
          "source": 4,
          "strategy": "HASH",
          "target": 6
        },
        {
          "mode": "PIPELINED",
          "source": 6,
          "strategy": "FORWARD",
          "target": 7
        },
        {
          "mode": "PIPELINED",
          "source": 7,
          "strategy": "FORWARD",
          "target": 8
        },
        {
          "mode": "PIPELINED",
          "source": 8,
          "strategy": "HASH",
          "target": 10
        },
        {
          "mode": "PIPELINED",
          "source": 10,
          "strategy": "FORWARD",
          "target": 11
        }
      ],
      "ssgProfiles": [
        {
          "managed": {},
          "name": "0",
          "cpu": 0.25,
          "offHeap": "32 mb",
          "heap": "992 mb",
          "extended": {}
        },
        {
          "managed": {
            "STATE_BACKEND": "512 mb"
          },
          "name": "1",
          "cpu": 0.25,
          "offHeap": "32 mb",
          "heap": "480 mb",
          "extended": {}
        },
        {
          "managed": {
            "STATE_BACKEND": "512 mb"
          },
          "name": "2",
          "cpu": 0.25,
          "offHeap": "32 mb",
          "heap": "480 mb",
          "extended": {}
        }
      ]
    

    Preset Flink SQL Stream real-time configuration - Flink runtime parameters - Other configurations

    blob.fetch.backlog: 1000
    taskmanager.debug.memory.log-interval: 5000
  4. After you configure the real-time settings, click Save above the code editor. Then click Publish. In the Publish Content panel that appears in the lower-right corner, click Start Publishing To Production and follow the prompts to confirm.

    image

Step 4: Enter the personal development environment

The personal development environment supports custom container images, connecting to user NAS and Git, and programming in Python using Notebooks.

In DataStudio, click the image icon at the top of the page. In the drop-down menu, select the personal development environment that you want to enter and wait for the page to load.

image

Step 5: Program and debug in Python

DataWorks is deeply integrated with DSW. After you enter the personal development environment, DataStudio supports writing, debugging, running, and scheduling Python code.

Important

You must complete Step 4: Enter the personal development environment before you start this step.

  1. On the DataStudio page, in your personal developer environment, click the workspace folder. Click the image icon to the right of Personal Folder. An untitled file appears in the list on the left. Name the file, press Enter, and wait for it to be generated.

    Preset file name: ec_item_rec.py.

    image

  2. In the code editor on the Python file page, enter the preset Python code. Then, click Run Python File above the code editor and check the results in the Terminal at the bottom of the page.

    image

    image

    Preset Python code

    import pandas as pd
    from surprise import Dataset, Reader, SVD
    from surprise.model_selection import train_test_split
    from surprise import accuracy
    
    # Create sample data
    data_dict = {
        'user_id': [1, 1, 1, 2, 2, 2, 3, 3, 4],
        'item_id': [101, 102, 103, 101, 104, 105, 102, 105, 101],
        'rating': [5, 3, 4, 2, 4, 5, 4, 5, 3]
    }
    
    # Convert the data to a DataFrame
    df = pd.DataFrame(data_dict)
    
    # Prepare the dataset using the Surprise library
    reader = Reader(rating_scale=(1, 5))
    data = Dataset.load_from_df(df[['user_id', 'item_id', 'rating']], reader)
    
    # Split the dataset into a training set and a test set
    trainset, testset = train_test_split(data, test_size=0.2)
    
    # Use the SVD algorithm for recommendations
    model = SVD()
    model.fit(trainset)
    
    # Make predictions
    predictions = model.test(testset)
    
    # Calculate RMSE
    rmse = accuracy.rmse(predictions)
    print(f'RMSE: {rmse:.2f}')
    
    # Get recommended products for a user
    def get_recommendations(user_id, model, all_items, n=3):
        item_ids = all_items['item_id'].unique()
        user_item_col = all_items[(all_items['user_id'] == user_id)]['item_id']
        unseen_items = [item for item in item_ids if item not in user_item_col.values]
    
        # Predict ratings for unseen items
        predictions = []
        for item in unseen_items:
            pred = model.predict(user_id, item)
            predictions.append((item, pred.est))
    
        # Sort by predicted rating
        predictions.sort(key=lambda x: x[1], reverse=True)
        return predictions[:n]
    
    # Get product recommendations
    all_items = df
    user_id = 1  # The ID of the user to get recommendations for
    recommendations = get_recommendations(user_id, model, all_items)
    
    print(f'Recommended products for user {user_id}:')
    for item_id, score in recommendations:
        print(f'Product ID: {item_id}, Predicted score: {score:.2f}')

    Python environment installation

    pip install pandas scikit-surprise
  3. To debug the code, click Debug Python File above the code editor or click the image icon in the panel to the left of the code editor. You can set breakpoints by clicking to the left of the line numbers.

    image

Step 6: Explore data with a Notebook

Notebook data exploration operations are performed in the personal development environment. You must complete Step 4: Enter the personal development environment before you begin.

Create a Notebook

  1. Go to DataStudio > Data Development.

  2. In the Personal Folder, right-click the target folder and select New Notebook.

  3. Enter a name for the Notebook and press Enter or click a blank area on the page to apply the name.

  4. In the personal folder, click the Notebook name to open it in the editor.

Use a Notebook

Note

The following steps are independent and can be performed in any order.

  • Notebook multi-engine development

    EMR Spark SQL

    1. In the DataWorks Notebook, click the image button to create a new SQL Cell.

    2. In the SQL Cell, enter the following statement to query the dim_ec_mbr_user_info table.

      dim_ec_mbr_user_info

      -- Description: Queries the basic member information of an e-commerce platform based on the member information source table and the region source table.
      USE openlake_win.default;
      SELECT  user.user_id AS user_id
              ,user.nick AS nick
              ,user.gmt_create AS gmt_modified
              ,user.gmt_modified AS gmt_create
              ,user.reg_fullname AS reg_fullname
              ,user.reg_mobile_phone AS reg_mobile_phone
              ,user.reg_email AS reg_email
              ,user.reg_gender AS reg_gender
              ,user.reg_gender_name AS reg_gender_name
              ,user.reg_birthdate AS reg_birthdate
              ,user.reg_address AS reg_address
              ,user.reg_nation_id AS reg_nation_id
              ,user.reg_nation_name AS reg_nation_name
              ,user.reg_prov_id AS reg_prov_id
              ,area_prov.name AS reg_prov_name
              ,user.reg_city_id AS reg_city_id
              ,area_city.name AS reg_city_name
              ,user.user_regip AS user_regip
              ,user.id_card_type AS id_card_type
              ,user.id_card_type_name AS id_card_type_name
              ,user.id_card_number AS id_card_number
              ,null as id_gender
              ,null as id_bday
              ,(2024 - CAST(SUBSTR(user.id_card_number,7,4) AS INT)) AS id_age
              ,user.user_regdate AS user_regdate
              ,user.user_active_type AS user_active_type
              ,user.user_active_name AS user_active_name
              ,user.user_active_time AS user_active_time
              ,user.vip_level AS vip_level
              ,user.vip_level_name AS vip_level_name
              ,user.is_delete AS is_delete
      FROM    (
                  SELECT  id    -- Primary key
                          ,gmt_create    -- Creation time
                          ,gmt_modified    -- Modification time
                          ,user_id    -- Member numeric ID
                          ,nick    -- Member NICK. Member nickname
                          ,reg_fullname    -- Real name for individual users, enterprise name for enterprise users
                          ,reg_mobile_phone    -- Mobile number bound at registration
                          ,reg_email    -- Email entered at registration (can be modified by the user)
                          ,reg_gender    -- Gender entered at registration (F for female, M for male, others are unknown, meaning gender is private)
                          ,CASE    WHEN reg_gender='F' THEN 'Female'
                                   WHEN reg_gender='M' THEN 'Male' 
                                   ELSE 'Unknown' 
                           END AS reg_gender_name    -- Gender entered at registration (F for female, M for male, others are unknown, meaning gender is private)
                          ,reg_birthdate    -- Birthday entered at registration (can be modified by the user)
                          ,reg_address    -- Address entered at registration (can be modified by the user)
                          ,reg_nation_id    -- Country ID entered at registration (currently empty)
                          ,CASE    WHEN reg_nation_id='cn' THEN 'China' 
                                   ELSE 'Outside China' 
                           END AS reg_nation_name
                          ,reg_prov_id    -- Province ID entered at registration
                          ,reg_city_id    -- City ID entered at registration
                          ,user_regip    -- Registration IP
                          ,id_card_type    -- Member certificate type 0: Unknown 1: ID card 2: Business license number
                          ,CASE    WHEN id_card_type=0 THEN 'Unknown'
                                   WHEN id_card_type=1 THEN 'ID Card'
                                   WHEN id_card_type=2 THEN 'Business License Number' 
                                   ELSE 'Abnormal' 
                           END AS id_card_type_name
                          ,id_card_number    -- ID card number for individual users, business license number for enterprise users, accuracy not guaranteed without verification
                          ,user_regdate    -- Registration time
                          ,user_active_type    -- User activation method
                          ,CASE    WHEN user_active_type='email' THEN 'Email'
                                   WHEN user_active_type='mobile_phone' THEN 'Mobile Phone' 
                                   ELSE 'Abnormal' 
                           END AS user_active_name    -- User activation method
                          ,user_active_time    -- Activation time
                          ,cast(vip_level AS BIGINT) AS vip_level    -- VIP level
                          ,CASE    WHEN vip_level>0 AND vip_level<=3 THEN 'Junior'
                                   WHEN vip_level>3 AND vip_level<=6 THEN 'Intermediate'
                                   WHEN vip_level>6 AND vip_level<=10 THEN 'Senior' 
                                   WHEN vip_level>10  THEN 'Premium' 
                           ELSE 'Abnormal'
                           END  AS vip_level_name
                          ,is_delete    -- Is deleted
                  FROM    ods_mbr_user_info
              ) AS USER
      LEFT JOIN (
                    SELECT  id,pid,name,shortname,longitude,latitude,level,sort
                    FROM    ods_t_area
                ) AS area_prov
      ON      user.reg_prov_id = area_prov.id 
      LEFT JOIN    (
                  SELECT  id,pid,name,shortname,longitude,latitude,level,sort
                  FROM    ods_t_area
              ) AS area_city
      ON      user.reg_city_id = area_city.id
      ;
    3. In the lower-right corner of the SQL Cell, set the SQL Cell type to EMR Spark SQL and the computing resource to openlake_serverless_spark.

      image

    4. Click the Run button, wait for the execution to complete, and view the data results.

    StarRocks SQL

    1. In the DataWorks Notebook, click the image button to create a new SQL Cell.

    2. In the SQL Cell, enter the following statement to query the dws_ec_trd_cate_commodity_gmv_kpi_fy table.

      dws_ec_trd_cate_commodity_gmv_kpi_fy

      -- Description: Queries data metrics such as "fiscal year_successful order payment amount" and "fiscal year_transaction amount completion rate" based on the transaction order fact table and the basic product information dimension table.
      USE `openlake_win`.`default`;
      select   t1.cate_id, t1.cate_name, t1.commodity_id, t1.commodity_name, round(10*sum(t1.total_fee),4) as pay_ord_amt_fy, round((10*sum(t1.total_fee)/30000000),4) as kpi_gmv_rate_fy
      from    (
                  select  DATE_FORMAT(a.gmt_create,'yyyymmdd') as stat_date
                          ,a.sub_order_id, a.buyer_id, a.item_id, a.biz_type, a.pay_status, a.total_fee/100 as total_fee, b.cate_id, b.cate_name, b.commodity_id, b.commodity_name 
                  from    `openlake_win`.`default`.dwd_ec_trd_create_ord_di a
                  left outer join (
                                      select  distinct item_id, cate_id, cate_name, commodity_id, commodity_name, shop_id, shop_nick
                                      from    `openlake_win`.`default`.dim_ec_itm_item_info
                                  ) b
                  on      a.item_id = b.item_id
                  and     a.shop_id = b.shop_id
              ) t1
      where   t1.pay_status in ('2')
      and     t1.biz_type in ('2','3','4')
      group by   t1.cate_id, t1.cate_name, t1.commodity_id, t1.commodity_name
      ;
    3. In the lower-right corner of the SQL Cell, set the SQL Cell type to StarRocks SQL and the computing resource to openlake_starrocks.

      image

    4. Click the Run button, wait for the execution to complete, and view the data results.

    Hologres SQL

    1. In the DataWorks Notebook, click the image button to create a new SQL Cell.

    2. In the SQL Cell, enter the following statement to query the dws_ec_mbr_cnt_std table.

      dws_ec_mbr_cnt_std

      -- Description: Calculates and transforms data from the "basic member information dimension table" to obtain data metrics such as "number of existing members" and gets the cube statistics for the number of existing members up to the current day.
      SELECT    IF(grouping(reg_prov_id) = 0, reg_prov_id, '-9999') as reg_prov_id
              , IF(grouping(reg_prov_name) = 0, reg_prov_name, 'All') as reg_prov_name
              , IF(grouping(reg_gender) = 0, reg_gender, '-9999') as reg_gender
              , IF(grouping(reg_gender_name) = 0, reg_gender_name, 'All') as reg_gender_name
              , IF(grouping(age_tag) = 0, age_tag, '-9999') as age_tag
              , IF(grouping(user_active_type) = 0, user_active_type, '-9999') as user_active_type
              , IF(grouping(user_active_name) = 0, user_active_name, 'All') as user_active_name
              , IF(grouping(vip_level) = 0, vip_level, '-9999') as vip_level
              , IF(grouping(vip_level_name) = 0, vip_level_name, 'All') as vip_level_name 
              , count(distinct user_id) as mbr_cnt
      from (
          select    reg_prov_id
                  , reg_prov_name
                  , reg_gender
                  , reg_gender_name
                  , case when cast(substr(reg_birthdate,1,4) as int)>=2010 and cast(substr(reg_birthdate,1,4) as int)<2020 then 'Post-2010s' 
                          when cast(substr(reg_birthdate,1,4) as int)>=2000 and cast(substr(reg_birthdate,1,4) as int)<2010 then 'Post-2000s' 
                          when cast(substr(reg_birthdate,1,4) as int)>=1990 and cast(substr(reg_birthdate,1,4) as int)<2000 then 'Post-90s' 
                          when cast(substr(reg_birthdate,1,4) as int)>=1980 and cast(substr(reg_birthdate,1,4) as int)<1990 then 'Post-80s' 
                          when cast(substr(reg_birthdate,1,4) as int)>=1970 and cast(substr(reg_birthdate,1,4) as int)<1980 then 'Post-70s' 
                          else 'Other' 
                    end as age_tag
                  , user_active_type
                  , user_active_name
                  , vip_level
                  , vip_level_name 
                  , user_id
          from    openlake_win.default.dim_ec_mbr_user_info
      ) _main       
      group by 
      grouping sets(
          (reg_prov_id, reg_prov_name)
         ,(reg_gender, reg_gender_name)
         ,(age_tag)
         ,(user_active_type, user_active_name)
         ,(vip_level, vip_level_name)
         ,()
      );
    3. In the lower-right corner of the SQL Cell, set the SQL Cell type to Hologres SQL and the computing resource to openlake_hologres.

      image

    4. Click the Run button, wait for the execution to complete, and view the data results.

    MaxCompute SQL

    1. In the DataWorks Notebook, click the image button to create a new SQL Cell.

    2. In the SQL Cell, enter the following statement to query the dws_ec_mbr_cnt_std table.

      dws_ec_mbr_cnt_std

      -- Description: Queries the "historical to-date existing member count cube statistics table" in the data warehouse base layer.
      set odps.task.major.version=flighting;
      set odps.namespace.schema=true;
      set odps.sql.allow.namespace.schema=true;
      set odps.service.mode=only;
      set odps.sql.unstructured.data.split.size=1;
      
      SELECT * 
      FROM openlake_win.default.dws_ec_mbr_cnt_std 
      LIMIT 200;
    3. In the lower-right corner of the SQL Cell, set the SQL Cell type to MaxCompute SQL and the computing resource to openlake_maxcompute.

      image

    4. Click the Run button, wait for the execution to complete, and view the data results.

  • Notebook interactive data
    1. In the DataWorks Notebook, click the image button to create a new Python Cell.

    2. In the upper-right corner of the Python Cell, click the image button to open the DataWorks Copilot intelligent programming assistant.

    3. In the DataWorks Copilot input box, enter the following requirement to generate an ipywidgets interactive component for querying member age.

      Note

      Description: Use Python to generate a slider widget for member age. The value range is from 1 to 100, with a default value of 20. Monitor changes to the widget's value in real time and save the value to a global variable named query_age.

    4. Review the Python code generated by DataWorks Copilot and click the Accept button.

      image

    5. Click the run button for the Python Cell, wait for the execution to complete, and view the generated interactive component. You can run the code generated by Copilot or the preset code. You can also slide the interactive component to select the target age.

      Sample code for generating an ipywidgets interactive component

      import ipywidgets as widgets
      
      # Create a slider widget
      slider = widgets.IntSlider(
          min = 1,
          max = 100,
          value = 20,
          description = 'Age:',
      )
      
      # Define the global variable query_age
      query_age = None
      
      
      # Define a function to handle slider value changes
      def on_slider_change(change):
          global query_age
          query_age = change.new
      
      # Bind the function to the slider's value change event
      slider.observe(on_slider_change,names='value')
      
      # Display the slider
      display(slider)

      image

    6. In the DataWorks Notebook, click the image button to create a new SQL Cell.

    7. In the SQL Cell, enter the following query statement, which includes the member age variable ${query_age} defined in Python.

      SELECT * FROM openlake_win.default.dim_ec_mbr_user_info
      WHERE CAST(id_age AS INT) >= ${query_age};
    8. In the lower-right corner of the SQL Cell, set the SQL Cell type to Hologres SQL and the computing resource to openlake_hologres.

      image

    9. Click the Run button, wait for the execution to complete, and view the data results.

    10. In the results, click the image button to generate a chart.

  • Notebook model development and training
    1. In the DataWorks Notebook, click the image button to create a new SQL Cell.

    2. In the SQL Cell, enter the following statement to query the ods_trade_order table.

      SELECT * FROM openlake_win.default.ods_trade_order;
    3. Write the SQL query result to a DataFrame variable. Click the df location and enter a custom DataFrame variable name, such as df_ml.

      image

    4. Click the Run button for the SQL Cell, wait for the execution to complete, and view the data results.

    5. In the DataWorks Notebook, click the image button to create a new Python Cell.

    6. In the Python Cell, enter the following statement to clean and process the data using Pandas and store it in a new DataFrame variable named df_ml_clean.

      import pandas as pd
      
      def clean_data(df_ml):
          # Generate a new column: estimated order total = item price * quantity
          df_ml['predict_total_fee'] = df_ml['item_price'].astype(float).values * df_ml['buy_amount'].astype(float).values
          # Rename the 'total_fee' column to 'actual_total_fee'
          df_ml = df_ml.rename(columns={'total_fee': 'actual_total_fee'})
          return df_ml
      
      df_ml_clean = clean_data(df_ml.copy())
      df_ml_clean.head()
    7. Click the Run button for the Python Cell, wait for the execution to complete, and view the data cleaning results.

    8. In the DataWorks Notebook, click the image button to create a new Python Cell.

    9. In the Python Cell, enter the following statement to build, train, and test a linear regression machine learning model.

      import pandas as pd  
      from sklearn.model_selection import train_test_split  
      from sklearn.linear_model import LinearRegression  
      from sklearn.metrics import mean_squared_error  
        
      # Get item price and total fee
      X = df_ml_clean[['predict_total_fee']].values  
      y = df_ml_clean['actual_total_fee'].astype(float).values  
      
      # Prepare the data  
      X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.05, random_state=42)  
      
      # Create and train the model  
      model = LinearRegression()  
      model.fit(X_train, y_train)  
        
      # Predict and evaluate  
      y_pred = model.predict(X_test)  
      for index, (x_t, y_pre, y_t) in enumerate(zip(X_test, y_pred, y_test)):
          print("[{:>2}] input: {:<10} prediction:{:<10} gt: {:<10}".format(str(index+1), f"{x_t[0]:.3f}", f"{y_pre:.3f}", f"{y_t:.3f}"))
      
      # Calculate the mean squared error (MSE)
      mse = mean_squared_error(y_test, y_pred)  
      print("Mean Squared Error (MSE):", mse)
    10. Click the Run button, wait for the execution to complete, and view the model training test results.