All Products
Search
Document Center

MaxCompute:Distributed Pandas processing based on MaxFrame

Last Updated:Jun 12, 2025

MaxFrame allows you to analyze data using the same APIs as Pandas in a distributed environment. With MaxFrame, you can achieve data analysis and computation on MaxCompute with performance that is dozens of times faster than open-source Pandas. This topic describes how to use common Pandas operators within MaxFrame.

Prerequisite

MaxFrame has been installed. For additional details, see Preparations.

Prepare data

  1. Execute the following code in a Python environment with MaxFrame installed to prepare test tables and data.

    from odps import ODPS
    from maxframe.session import new_session
    import maxframe.dataframe as md
    import pandas as pd
    import os
    
    o = ODPS(
        # Ensure that the ALIBABA_CLOUD_ACCESS_KEY_ID environment variable is set to your AccessKey ID.
        # Ensure that the ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variable is set to your AccessKey secret.
        # We recommend that you do not directly use the strings of your AccessKey ID and AccessKey secret.
        os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
        os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
        project='your-default-project',
        endpoint='your-end-point',
    )
    
    data_sets = [{
        "table_name": "product",
        "table_schema" : "index bigint, product_id bigint, product_name string, current_price bigint",
        "source_type": "records",
        "records" : [
            [1, 100, 'Nokia', 1000],
            [2, 200, 'Apple', 5000],
            [3, 300, 'Samsung', 9000]
        ],
    },
    {
        "table_name" : "sales",
        "table_schema" : "index bigint, sale_id bigint, product_id bigint, user_id bigint, year bigint, quantity bigint, price bigint",
        "source_type": "records",
        "records" : [
            [1, 1, 100, 101, 2008, 10, 5000],
            [2, 2, 300, 101, 2009, 7, 4000],
            [3, 4, 100, 102, 2011, 9, 4000],
            [4, 5, 200, 102, 2013, 6, 6000],
            [5, 8, 300, 102, 2015, 10, 9000],
            [6, 9, 100, 102, 2015, 6, 2000]
        ],
        "lifecycle": 5
    }]
    
    def prepare_data(o: ODPS, data_sets, suffix="", drop_if_exists=False):
        for index, data in enumerate(data_sets):
            table_name = data.get("table_name")
            table_schema = data.get("table_schema")
            source_type = data.get("source_type")
            
            if not table_name or not table_schema or not source_type:
                raise ValueError(f"Dataset at index {index} is missing one or more required keys: 'table_name', 'table_schema', or 'source_type'.")
    
            lifecycle = data.get("lifecycle", 5)
            table_name += suffix
            
            print(f"Processing {table_name}...")
            if drop_if_exists:
                print(f"Deleting {table_name}...")
                o.delete_table(table_name, if_exists=True)
            
            o.create_table(name=table_name, table_schema=table_schema, lifecycle=lifecycle, if_not_exists=True)
    
            if source_type == "local_file":
                file_path = data.get("file")
                if not file_path:
                    raise ValueError(f"Dataset at index {index} with source_type 'local_file' is missing the 'file' key.")
                sep = data.get("sep", ",")
                pd_df = pd.read_csv(file_path, sep=sep)
                ODPSDataFrame(pd_df).persist(table_name, drop_table=True)
            elif source_type == 'records':
                records = data.get("records")
                if not records:
                    raise ValueError(f"Dataset at index {index} with source_type 'records' is missing the 'records' key.")
                with o.get_table(table_name).open_writer() as writer:
                    writer.write(records)
            else:
                raise ValueError(f"Unknown data set source_type: {source_type}")
            
            print(f"Processed {table_name} Done")
    
    prepare_data(o, data_sets, "_maxframe_demo", True)

    Parameters:

    • ALIBABA_CLOUD_ACCESS_KEY_ID: To access the target MaxCompute project, you must set this environment variable with an AccessKey ID that has MaxCompute permissions. You can retrieve the AccessKey ID from the AccessKey Management page.

    • ALIBABA_CLOUD_ACCESS_KEY_SECRET: Set this environment variable to the AccessKey secret that corresponds to the AccessKey ID.

    • your-default-project: The name of the MaxCompute project. To view the project name, log on to the MaxCompute console, and choose Workspace > Projects from the left-side navigation pane.

    • your-end-point: The endpoint for the MaxCompute project's region. You can choose it based on the network connectivity method, for example, http://service.cn-chengdu.maxcompute.aliyun.com/api. For more information, see Endpoints.

  2. Retrieve data from the sales_maxframe_demo and product_maxframe_demo tables by using the following SQL commands:

    --Query sales_maxframe_demo table
    SELECT * FROM sales_maxframe_demo;
    --Return results
    +------------+------------+------------+------------+------------+------------+------------+
    | index      | sale_id    | product_id | user_id    | year       | quantity   | price      |
    +------------+------------+------------+------------+------------+------------+------------+
    | 1          | 1          | 100        | 101        | 2008       | 10         | 5000       |
    | 2          | 2          | 300        | 101        | 2009       | 7          | 4000       |
    | 3          | 4          | 100        | 102        | 2011       | 9          | 4000       |
    | 4          | 5          | 200        | 102        | 2013       | 6          | 6000       |
    | 5          | 8          | 300        | 102        | 2015       | 10         | 9000       |
    | 6          | 9          | 100        | 102        | 2015       | 6          | 2000       |
    +------------+------------+------------+------------+------------+------------+------------+
    
    --Query product_maxframe_demo table data
    SELECT * FROM product_maxframe_demo;
    --Return results
    +------------+------------+--------------+---------------+
    | index      | product_id | product_name | current_price |
    +------------+------------+--------------+---------------+
    | 1          | 100        | Nokia        | 1000          |
    | 2          | 200        | Apple        | 5000          |
    | 3          | 300        | Samsung      | 9000          |
    +------------+------------+--------------+---------------+

Use MaxFrame for data analysis

  • Scenario 1: Use the merge method to join two data tables to retrieve all sale_id in the sales_maxframe_demo table, the corresponding product_name, and all year and price of the product

    • Sample code:

      from odps import ODPS
      from maxframe.session import new_session
      import maxframe.dataframe as md
      import os
      
      o = ODPS(
          # Set the ALIBABA_CLOUD_ACCESS_KEY_ID environment variable to your AccessKey ID.
          # Set the ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variable to your AccessKey secret.
          # We recommend that you do not directly use the strings of your AccessKey ID and AccessKey secret.
          os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
          os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
          project='your-default-project',
          endpoint='your-end-point',
      )
      
      session = new_session(o)
      
      # The session ID is a string used to associate MaxFrame tasks, which is important for debugging and tracking task status.
      print(session.session_id)
      
      sales = md.read_odps_table("sales_maxframe_demo", index_col="index")
      product = md.read_odps_table("product_maxframe_demo", index_col="product_id")
      
      # The df will not execute immediately unless you use df.execute() to trigger it.
      # This means that all computations will ultimately be completed entirely in the MaxCompute cluster, avoiding unnecessary data transmission and blocking.
      df = sales.merge(product, left_on="product_id", right_index=True)
      df = df[["product_name", "year", "price"]]
      
      print(df.execute().fetch())
      
      # Save the result to a MaxCompute table and destroy the session.
      md.to_odps_table(df, "result_df", overwrite=True).execute()
      
      session.destroy()

      Returned result:

      index product_name  year  price                   
      1            Nokia  2008   5000
      2          Samsung  2009   4000
      3            Nokia  2011   4000
      4            Apple  2013   6000
      5          Samsung  2015   9000
      6            Nokia  2015   2000
    • Performance comparison:

      Comparing time consumption with a sales table of 50 million records (1.96 GB) and a product table of 100,000 records (3 MB), the following results are obtained:

      Environment

      Time consumption (unit: seconds)

      Local Pandas (V1.3.5)

      65.8

      MaxFrame

      22

  • Scenario 2: Select the product ID, year, quantity, and price of the first year of sales for each product sold

    • Sample code:

      from odps import ODPS
      from maxframe.session import new_session
      import maxframe.dataframe as md
      import os
      
      o = ODPS(
          # Set the ALIBABA_CLOUD_ACCESS_KEY_ID environment variable to your AccessKey ID.
          # Set the ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variable to your AccessKey secret.
          # We recommend that you do not directly use the strings of your AccessKey ID and AccessKey secret.
          os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
          os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
          project='your-default-project',
          endpoint='your-end-point',
      )
      
      session = new_session(o)
      
      # The session ID is a string used to associate MaxFrame tasks, which is important for debugging and tracking task status.
      print(session.session_id)
      
      # Aggregate to obtain the first year of each product
      min_year_df = md.read_odps_table("sales_maxframe_demo", index_col="index")
      min_year_df = min_year_df.groupby('product_id', as_index=False).agg(first_year=('year', 'min'))
      
      # Join to find the corresponding sales records
      sales = md.read_odps_table("sales_maxframe_demo", index_col=['product_id', 'year'])
      result_df = md.merge(sales, min_year_df, 
                              left_index=True, 
                              right_on=['product_id','first_year'],
                              how='inner')
      
      # The result_df will not execute immediately unless you use result_df.execute() to trigger it.
      # This means that all computations will ultimately be completed entirely in the MaxCompute cluster, avoiding unnecessary data transmission and blocking.
      result_df = result_df[['product_id', 'first_year', 'quantity', 'price']]
      
      print(result_df.execute().fetch())
      
      # Destroy the session
      session.destroy()

      Returned result:

          product_id  first_year  quantity  price
      100         100        2008        10   5000
      300         300        2009         7   4000
      200         200        2013         6   6000
    • Performance comparison:

      Comparing time consumption with a sales table of 50 million records (1.96 GB) and a product table of 100,000 records (3 MB), the following results are obtained:

      Environment

      Time consumption (unit: seconds)

      Local Pandas (V1.3.5)

      186

      MaxFrame

      21

  • Scenario 3: Obtain the product ID with the highest consumption for each user

    Note

    This scenario demonstrates the use of multiple groupby, join, drop_duplicates, and sort_values operations.

    • Sample code:

      from odps import ODPS
      from maxframe.session import new_session
      import maxframe.dataframe as md
      import os
      
      o = ODPS(
          os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
          os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
          project='your-default-project',
          endpoint='your-end-point',
      )
      
      session = new_session(o)
      
      # The session ID is a string used to associate MaxFrame tasks, which is important for debugging and tracking task status.
      print(session.session_id)
      
      sales = md.read_odps_table("sales_maxframe_demo", index_col="index")
      product = md.read_odps_table("product_maxframe_demo", index_col="product_id")
      
      sales['total'] = sales['price'] * sales['quantity']
      
      product_cost_df = sales.groupby(['product_id', 'user_id'], as_index=False).agg(user_product_total=('total','sum'))
      product_cost_df = product_cost_df.merge(product, left_on="product_id", right_index=True, how='right')
      
      user_cost_df = product_cost_df.groupby('user_id').agg(max_total=('user_product_total', 'max'))
      merge_df = product_cost_df.merge(user_cost_df, left_on='user_id', right_index=True)
      
      # The result_df will not execute immediately unless you use result_df.execute() to trigger it.
      # This means that all computations will ultimately be completed entirely in the MaxCompute cluster, avoiding unnecessary data transmission and blocking.
      result_df = merge_df[merge_df['user_product_total'] == merge_df['max_total']][['user_id', 'product_id']].drop_duplicates().sort_values(['user_id'], ascending = [1])
      
      print(result_df.execute().fetch())
      
      # Destroy the session
      
      session.destroy()

      Returned result:

           user_id  product_id
      100      101         100
      300      102         300
    • Performance comparison:

      Comparing time consumption with a sales table of 50 million records (1.96 GB) and a product table of 100,000 records (3 MB), the following results are obtained:

      Environment

      Time consumption (unit: seconds)

      Local Pandas (V1.3.5)

      176

      MaxFrame

      85

Conclusion

MaxFrame seamlessly integrates with the Pandas interface to enable automatic distributed processing. It maintains robust data processing capabilities while also significantly enhancing the scale and efficiency of data analysis and computation.