すべてのプロダクト
Search
ドキュメントセンター

MaxCompute:MaxFrameに基づく分散Pandas処理

最終更新日:Jan 08, 2025

MaxFrameを使用すると、分散環境でPandasと同じAPIを使用してデータを分析できます。 MaxFrameを使用すると、MaxComputeでのデータ分析と計算を、オープンソースのPandasよりも数十倍高速なパフォーマンスで実現できます。 このトピックでは、MaxFrame内で一般的なPandas演算子を使用する方法について説明します。

前提条件

MaxFrameがインストールされました。 詳細については、「準備」をご参照ください。

データの準備

  1. MaxFrameがインストールされたPython環境で次のコードを実行して、テストテーブルとデータを準備します。

    from odps import ODPS
    from maxframe.session import new_session
    import maxframe.dataframe as md
    import pandas as pd
    import os
    
    o = ODPS(
        # Ensure that the ALIBABA_CLOUD_ACCESS_KEY_ID environment variable is set to your AccessKey ID.
        # Ensure that the ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variable is set to your AccessKey secret.
        # We recommend that you do not directly use the strings of your AccessKey ID and AccessKey secret.
        os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
        os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
        project='your-default-project',
        endpoint='your-end-point',
    )
    
    data_sets = [{
        "table_name": "product",
        "table_schema" : "index bigint, product_id bigint, product_name string, current_price bigint",
        "source_type": "records",
        "records" : [
            [1, 100, 'Nokia', 1000],
            [2, 200, 'Apple', 5000],
            [3, 300, 'Samsung', 9000]
        ],
    },
    {
        "table_name" : "sales",
        "table_schema" : "index bigint, sale_id bigint, product_id bigint, user_id bigint, year bigint, quantity bigint, price bigint",
        "source_type": "records",
        "records" : [
            [1, 1, 100, 101, 2008, 10, 5000],
            [2, 2, 300, 101, 2009, 7, 4000],
            [3, 4, 100, 102, 2011, 9, 4000],
            [4, 5, 200, 102, 2013, 6, 6000],
            [5, 8, 300, 102, 2015, 10, 9000],
            [6, 9, 100, 102, 2015, 6, 2000]
        ],
        "lifecycle": 5
    }]
    
    def prepare_data(o: ODPS, data_sets, suffix="", drop_if_exists=False):
        for index, data in enumerate(data_sets):
            table_name = data.get("table_name")
            table_schema = data.get("table_schema")
            source_type = data.get("source_type")
            
            if not table_name or not table_schema or not source_type:
                raise ValueError(f"Dataset at index {index} is missing one or more required keys: 'table_name', 'table_schema', or 'source_type'.")
    
            lifecycle = data.get("lifecycle", 5)
            table_name += suffix
            
            print(f"Processing {table_name}...")
            if drop_if_exists:
                print(f"Deleting {table_name}...")
                o.delete_table(table_name, if_exists=True)
            
            o.create_table(name=table_name, table_schema=table_schema, lifecycle=lifecycle, if_not_exists=True)
    
            if source_type == "local_file":
                file_path = data.get("file")
                if not file_path:
                    raise ValueError(f"Dataset at index {index} with source_type 'local_file' is missing the 'file' key.")
                sep = data.get("sep", ",")
                pd_df = pd.read_csv(file_path, sep=sep)
                ODPSDataFrame(pd_df).persist(table_name, drop_table=True)
            elif source_type == 'records':
                records = data.get("records")
                if not records:
                    raise ValueError(f"Dataset at index {index} with source_type 'records' is missing the 'records' key.")
                with o.get_table(table_name).open_writer() as writer:
                    writer.write(records)
            else:
                raise ValueError(f"Unknown data set source_type: {source_type}")
            
            print(f"Processed {table_name} Done")
    
    prepare_data(o, data_sets, "_maxframe_demo", True)

    パラメーター:

    • ALIBABA_CLOUD_ACCESS_KEY_ID: ターゲットMaxComputeプロジェクトにアクセスするには、この環境変数をMaxCompute権限を持つAccessKey IDで設定する必要があります。 AccessKey管理ページからAccessKey IDを取得できます。

    • ALIBABA_CLOUD_ACCESS_KEY_SECRET: この環境変数を、AccessKey IDに対応するAccessKeyシークレットに設定します。

    • your-default-project: MaxComputeプロジェクトの名前。 プロジェクト名を表示するには、MaxComputeコンソールにログインし、左側のナビゲーションウィンドウから [ワークスペース] > [プロジェクト] を選択します。

    • your-end-point: MaxComputeプロジェクトのリージョンのエンドポイントです。 http://service.cn-chengdu.maxcompute.aliyun.com/api などのネットワーク接続方法に基づいて選択できます。 詳細については、「エンドポイント」をご参照ください。

  2. 次のSQLコマンドを使用して、sales_maxframe_demoおよびproduct_maxframe_demoテーブルからデータを取得します。

    --Query sales_maxframe_demo table
    SELECT * FROM sales_maxframe_demo;
    --Return results
    +------------+------------+------------+------------+------------+------------+------------+
    | index      | sale_id    | product_id | user_id    | year       | quantity   | price      |
    +------------+------------+------------+------------+------------+------------+------------+
    | 1          | 1          | 100        | 101        | 2008       | 10         | 5000       |
    | 2          | 2          | 300        | 101        | 2009       | 7          | 4000       |
    | 3          | 4          | 100        | 102        | 2011       | 9          | 4000       |
    | 4          | 5          | 200        | 102        | 2013       | 6          | 6000       |
    | 5          | 8          | 300        | 102        | 2015       | 10         | 9000       |
    | 6          | 9          | 100        | 102        | 2015       | 6          | 2000       |
    +------------+------------+------------+------------+------------+------------+------------+
    
    --Query product_maxframe_demo table data
    SELECT * FROM product_maxframe_demo;
    --Return results
    +------------+------------+--------------+---------------+
    | index      | product_id | product_name | current_price |
    +------------+------------+--------------+---------------+
    | 1          | 100        | Nokia        | 1000          |
    | 2          | 200        | Apple        | 5000          |
    | 3          | 300        | Samsung      | 9000          |
    +------------+------------+--------------+---------------+

データ分析にMaxFrameを使用する

シナリオ1: マージメソッドを使用して2つのデータテーブルを結合し、sales_maxframe_demoテーブル内のすべてのsale_id、対応するproduct_name、および製品のすべての価格を取得します。

  • サンプルコード:

    from odps import ODPS
    from maxframe.session import new_session
    import maxframe.dataframe as md
    import os
    
    o = ODPS(
        # Set the ALIBABA_CLOUD_ACCESS_KEY_ID environment variable to your AccessKey ID.
        # Set the ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variable to your AccessKey secret.
        # We recommend that you do not directly use the strings of your AccessKey ID and AccessKey secret.
        os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
        os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
        project='your-default-project',
        endpoint='your-end-point',
    )
    
    session = new_session(o)
    
    # The session ID is a string used to associate MaxFrame tasks, which is important for debugging and tracking task status.
    print(session.session_id)
    
    sales = md.read_odps_table("sales_maxframe_demo", index_col="index")
    product = md.read_odps_table("product_maxframe_demo", index_col="product_id")
    
    # The df will not execute immediately unless you use df.execute() to trigger it.
    # This means that all computations will ultimately be completed entirely in the MaxCompute cluster, avoiding unnecessary data transmission and blocking.
    df = sales.merge(product, left_on="product_id", right_index=True)
    df = df[["product_name", "year", "price"]]
    
    print(df.execute().fetch())
    
    # Save the result to a MaxCompute table and destroy the session.
    md.to_odps_table(df, "result_df", overwrite=True).execute()
    
    session.destroy()

    返される結果 :

    index product_name  year  price                   
    1            Nokia  2008   5000
    2          Samsung  2009   4000
    3            Nokia  2011   4000
    4            Apple  2013   6000
    5          Samsung  2015   9000
    6            Nokia  2015   2000
  • パフォーマンスの比較:

    消費時間を50万レコード (1.96 GB) の売上高テーブルと100,000レコード (3 MB) の製品テーブルと比較すると、次の結果が得られます。

    環境

    時間消費 (単位: 秒)

    ローカルパンダ (V1.3.5)

    65.8

    MaxFrame

    22

シナリオ2: 販売された各製品の販売の最初の年の製品ID、年、数量、および価格を選択します

  • サンプルコード:

    from odps import ODPS
    from maxframe.session import new_session
    import maxframe.dataframe as md
    import os
    
    o = ODPS(
        # Set the ALIBABA_CLOUD_ACCESS_KEY_ID environment variable to your AccessKey ID.
        # Set the ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variable to your AccessKey secret.
        # We recommend that you do not directly use the strings of your AccessKey ID and AccessKey secret.
        os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
        os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
        project='your-default-project',
        endpoint='your-end-point',
    )
    
    session = new_session(o)
    
    # The session ID is a string used to associate MaxFrame tasks, which is important for debugging and tracking task status.
    print(session.session_id)
    
    # Aggregate to obtain the first year of each product
    min_year_df = md.read_odps_table("sales_maxframe_demo", index_col="index")
    min_year_df = min_year_df.groupby('product_id', as_index=False).agg(first_year=('year', 'min'))
    
    # Join to find the corresponding sales records
    sales = md.read_odps_table("sales_maxframe_demo", index_col=['product_id', 'year'])
    result_df = md.merge(sales, min_year_df, 
                            left_index=True, 
                            right_on=['product_id','first_year'],
                            how='inner')
    
    # The result_df will not execute immediately unless you use result_df.execute() to trigger it.
    # This means that all computations will ultimately be completed entirely in the MaxCompute cluster, avoiding unnecessary data transmission and blocking.
    result_df = result_df[['product_id', 'first_year', 'quantity', 'price']]
    
    print(result_df.execute().fetch())
    
    # Destroy the session
    session.destroy()

    返される結果 :

        product_id  first_year  quantity  price
    100         100        2008        10   5000
    300         300        2009         7   4000
    200         200        2013         6   6000
  • パフォーマンスの比較:

    消費時間を50万レコード (1.96 GB) の売上高テーブルと100,000レコード (3 MB) の製品テーブルと比較すると、次の結果が得られます。

    環境

    時間消費 (単位: 秒)

    ローカルパンダ (V1.3.5)

    186

    MaxFrame

    21

シナリオ3: 各ユーザーの消費量が最も多い製品IDを取得する

説明

このシナリオでは、複数のgroupby、join、drop_duplicates、およびsort_values操作の使用を示します。

  • サンプルコード:

    from odps import ODPS
    from maxframe.session import new_session
    import maxframe.dataframe as md
    import os
    
    o = ODPS(
        os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
        os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
        project='your-default-project',
        endpoint='your-end-point',
    )
    
    session = new_session(o)
    
    # The session ID is a string used to associate MaxFrame tasks, which is important for debugging and tracking task status.
    print(session.session_id)
    
    sales = md.read_odps_table("sales_maxframe_demo", index_col="index")
    product = md.read_odps_table("product_maxframe_demo", index_col="product_id")
    
    sales['total'] = sales['price'] * sales['quantity']
    
    product_cost_df = sales.groupby(['product_id', 'user_id'], as_index=False).agg(user_product_total=('total','sum'))
    product_cost_df = product_cost_df.merge(product, left_on="product_id", right_index=True, how='right')
    
    user_cost_df = product_cost_df.groupby('user_id').agg(max_total=('user_product_total', 'max'))
    merge_df = product_cost_df.merge(user_cost_df, left_on='user_id', right_index=True)
    
    # The result_df will not execute immediately unless you use result_df.execute() to trigger it.
    # This means that all computations will ultimately be completed entirely in the MaxCompute cluster, avoiding unnecessary data transmission and blocking.
    result_df = merge_df[merge_df['user_product_total'] == merge_df['max_total']][['user_id', 'product_id']].drop_duplicates().sort_values(['user_id'], ascending = [1])
    
    print(result_df.execute().fetch())
    
    # Destroy the session
    
    session.destroy()

    返される結果 :

         user_id  product_id
    100      101         100
    300      102         300
  • パフォーマンスの比較:

    消費時間を50万レコード (1.96 GB) の売上高テーブルと100,000レコード (3 MB) の製品テーブルと比較すると、次の結果が得られます。

    環境

    時間消費 (単位: 秒)

    ローカルパンダ (V1.3.5)

    176

    MaxFrame

    85

結論

MaxFrameはPandasインターフェイスとシームレスに統合され、自動分散処理を可能にします。 堅牢なデータ処理機能を維持しながら、データ分析と計算の規模と効率を大幅に向上させます。