MaxFrameを使用すると、分散環境でPandasと同じAPIを使用してデータを分析できます。 MaxFrameを使用すると、MaxComputeでのデータ分析と計算を、オープンソースのPandasよりも数十倍高速なパフォーマンスで実現できます。 このトピックでは、MaxFrame内で一般的なPandas演算子を使用する方法について説明します。
前提条件
MaxFrameがインストールされました。 詳細については、「準備」をご参照ください。
データの準備
MaxFrameがインストールされたPython環境で次のコードを実行して、テストテーブルとデータを準備します。
from odps import ODPS from maxframe.session import new_session import maxframe.dataframe as md import pandas as pd import os o = ODPS( # Ensure that the ALIBABA_CLOUD_ACCESS_KEY_ID environment variable is set to your AccessKey ID. # Ensure that the ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variable is set to your AccessKey secret. # We recommend that you do not directly use the strings of your AccessKey ID and AccessKey secret. os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'), os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'), project='your-default-project', endpoint='your-end-point', ) data_sets = [{ "table_name": "product", "table_schema" : "index bigint, product_id bigint, product_name string, current_price bigint", "source_type": "records", "records" : [ [1, 100, 'Nokia', 1000], [2, 200, 'Apple', 5000], [3, 300, 'Samsung', 9000] ], }, { "table_name" : "sales", "table_schema" : "index bigint, sale_id bigint, product_id bigint, user_id bigint, year bigint, quantity bigint, price bigint", "source_type": "records", "records" : [ [1, 1, 100, 101, 2008, 10, 5000], [2, 2, 300, 101, 2009, 7, 4000], [3, 4, 100, 102, 2011, 9, 4000], [4, 5, 200, 102, 2013, 6, 6000], [5, 8, 300, 102, 2015, 10, 9000], [6, 9, 100, 102, 2015, 6, 2000] ], "lifecycle": 5 }] def prepare_data(o: ODPS, data_sets, suffix="", drop_if_exists=False): for index, data in enumerate(data_sets): table_name = data.get("table_name") table_schema = data.get("table_schema") source_type = data.get("source_type") if not table_name or not table_schema or not source_type: raise ValueError(f"Dataset at index {index} is missing one or more required keys: 'table_name', 'table_schema', or 'source_type'.") lifecycle = data.get("lifecycle", 5) table_name += suffix print(f"Processing {table_name}...") if drop_if_exists: print(f"Deleting {table_name}...") o.delete_table(table_name, if_exists=True) o.create_table(name=table_name, table_schema=table_schema, lifecycle=lifecycle, if_not_exists=True) if source_type == "local_file": file_path = data.get("file") if not file_path: raise ValueError(f"Dataset at index {index} with source_type 'local_file' is missing the 'file' key.") sep = data.get("sep", ",") pd_df = pd.read_csv(file_path, sep=sep) ODPSDataFrame(pd_df).persist(table_name, drop_table=True) elif source_type == 'records': records = data.get("records") if not records: raise ValueError(f"Dataset at index {index} with source_type 'records' is missing the 'records' key.") with o.get_table(table_name).open_writer() as writer: writer.write(records) else: raise ValueError(f"Unknown data set source_type: {source_type}") print(f"Processed {table_name} Done") prepare_data(o, data_sets, "_maxframe_demo", True)
パラメーター:
ALIBABA_CLOUD_ACCESS_KEY_ID: ターゲットMaxComputeプロジェクトにアクセスするには、この環境変数をMaxCompute権限を持つAccessKey IDで設定する必要があります。 AccessKey管理ページからAccessKey IDを取得できます。
ALIBABA_CLOUD_ACCESS_KEY_SECRET: この環境変数を、AccessKey IDに対応するAccessKeyシークレットに設定します。
your-default-project: MaxComputeプロジェクトの名前。 プロジェクト名を表示するには、MaxComputeコンソールにログインし、左側のナビゲーションウィンドウから [ワークスペース] > [プロジェクト] を選択します。
your-end-point: MaxComputeプロジェクトのリージョンのエンドポイントです。
http://service.cn-chengdu.maxcompute.aliyun.com/api
などのネットワーク接続方法に基づいて選択できます。 詳細については、「エンドポイント」をご参照ください。
次のSQLコマンドを使用して、sales_maxframe_demoおよびproduct_maxframe_demoテーブルからデータを取得します。
--Query sales_maxframe_demo table SELECT * FROM sales_maxframe_demo; --Return results +------------+------------+------------+------------+------------+------------+------------+ | index | sale_id | product_id | user_id | year | quantity | price | +------------+------------+------------+------------+------------+------------+------------+ | 1 | 1 | 100 | 101 | 2008 | 10 | 5000 | | 2 | 2 | 300 | 101 | 2009 | 7 | 4000 | | 3 | 4 | 100 | 102 | 2011 | 9 | 4000 | | 4 | 5 | 200 | 102 | 2013 | 6 | 6000 | | 5 | 8 | 300 | 102 | 2015 | 10 | 9000 | | 6 | 9 | 100 | 102 | 2015 | 6 | 2000 | +------------+------------+------------+------------+------------+------------+------------+ --Query product_maxframe_demo table data SELECT * FROM product_maxframe_demo; --Return results +------------+------------+--------------+---------------+ | index | product_id | product_name | current_price | +------------+------------+--------------+---------------+ | 1 | 100 | Nokia | 1000 | | 2 | 200 | Apple | 5000 | | 3 | 300 | Samsung | 9000 | +------------+------------+--------------+---------------+
データ分析にMaxFrameを使用する
シナリオ1: マージメソッドを使用して2つのデータテーブルを結合し、sales_maxframe_demoテーブル内のすべてのsale_id
、対応するproduct_name
、および製品のすべての年
と価格
を取得します。
サンプルコード:
from odps import ODPS from maxframe.session import new_session import maxframe.dataframe as md import os o = ODPS( # Set the ALIBABA_CLOUD_ACCESS_KEY_ID environment variable to your AccessKey ID. # Set the ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variable to your AccessKey secret. # We recommend that you do not directly use the strings of your AccessKey ID and AccessKey secret. os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'), os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'), project='your-default-project', endpoint='your-end-point', ) session = new_session(o) # The session ID is a string used to associate MaxFrame tasks, which is important for debugging and tracking task status. print(session.session_id) sales = md.read_odps_table("sales_maxframe_demo", index_col="index") product = md.read_odps_table("product_maxframe_demo", index_col="product_id") # The df will not execute immediately unless you use df.execute() to trigger it. # This means that all computations will ultimately be completed entirely in the MaxCompute cluster, avoiding unnecessary data transmission and blocking. df = sales.merge(product, left_on="product_id", right_index=True) df = df[["product_name", "year", "price"]] print(df.execute().fetch()) # Save the result to a MaxCompute table and destroy the session. md.to_odps_table(df, "result_df", overwrite=True).execute() session.destroy()
返される結果 :
index product_name year price 1 Nokia 2008 5000 2 Samsung 2009 4000 3 Nokia 2011 4000 4 Apple 2013 6000 5 Samsung 2015 9000 6 Nokia 2015 2000
パフォーマンスの比較:
消費時間を50万レコード (1.96 GB) の売上高テーブルと100,000レコード (3 MB) の製品テーブルと比較すると、次の結果が得られます。
環境
時間消費 (単位: 秒)
ローカルパンダ (V1.3.5)
65.8
MaxFrame
22
シナリオ2: 販売された各製品の販売の最初の年の製品ID、年、数量、および価格を選択します
サンプルコード:
from odps import ODPS from maxframe.session import new_session import maxframe.dataframe as md import os o = ODPS( # Set the ALIBABA_CLOUD_ACCESS_KEY_ID environment variable to your AccessKey ID. # Set the ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variable to your AccessKey secret. # We recommend that you do not directly use the strings of your AccessKey ID and AccessKey secret. os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'), os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'), project='your-default-project', endpoint='your-end-point', ) session = new_session(o) # The session ID is a string used to associate MaxFrame tasks, which is important for debugging and tracking task status. print(session.session_id) # Aggregate to obtain the first year of each product min_year_df = md.read_odps_table("sales_maxframe_demo", index_col="index") min_year_df = min_year_df.groupby('product_id', as_index=False).agg(first_year=('year', 'min')) # Join to find the corresponding sales records sales = md.read_odps_table("sales_maxframe_demo", index_col=['product_id', 'year']) result_df = md.merge(sales, min_year_df, left_index=True, right_on=['product_id','first_year'], how='inner') # The result_df will not execute immediately unless you use result_df.execute() to trigger it. # This means that all computations will ultimately be completed entirely in the MaxCompute cluster, avoiding unnecessary data transmission and blocking. result_df = result_df[['product_id', 'first_year', 'quantity', 'price']] print(result_df.execute().fetch()) # Destroy the session session.destroy()
返される結果 :
product_id first_year quantity price 100 100 2008 10 5000 300 300 2009 7 4000 200 200 2013 6 6000
パフォーマンスの比較:
消費時間を50万レコード (1.96 GB) の売上高テーブルと100,000レコード (3 MB) の製品テーブルと比較すると、次の結果が得られます。
環境
時間消費 (単位: 秒)
ローカルパンダ (V1.3.5)
186
MaxFrame
21
シナリオ3: 各ユーザーの消費量が最も多い製品IDを取得する
このシナリオでは、複数のgroupby、join、drop_duplicates、およびsort_values操作の使用を示します。
サンプルコード:
from odps import ODPS from maxframe.session import new_session import maxframe.dataframe as md import os o = ODPS( os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'), os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'), project='your-default-project', endpoint='your-end-point', ) session = new_session(o) # The session ID is a string used to associate MaxFrame tasks, which is important for debugging and tracking task status. print(session.session_id) sales = md.read_odps_table("sales_maxframe_demo", index_col="index") product = md.read_odps_table("product_maxframe_demo", index_col="product_id") sales['total'] = sales['price'] * sales['quantity'] product_cost_df = sales.groupby(['product_id', 'user_id'], as_index=False).agg(user_product_total=('total','sum')) product_cost_df = product_cost_df.merge(product, left_on="product_id", right_index=True, how='right') user_cost_df = product_cost_df.groupby('user_id').agg(max_total=('user_product_total', 'max')) merge_df = product_cost_df.merge(user_cost_df, left_on='user_id', right_index=True) # The result_df will not execute immediately unless you use result_df.execute() to trigger it. # This means that all computations will ultimately be completed entirely in the MaxCompute cluster, avoiding unnecessary data transmission and blocking. result_df = merge_df[merge_df['user_product_total'] == merge_df['max_total']][['user_id', 'product_id']].drop_duplicates().sort_values(['user_id'], ascending = [1]) print(result_df.execute().fetch()) # Destroy the session session.destroy()
返される結果 :
user_id product_id 100 101 100 300 102 300
パフォーマンスの比較:
消費時間を50万レコード (1.96 GB) の売上高テーブルと100,000レコード (3 MB) の製品テーブルと比較すると、次の結果が得られます。
環境
時間消費 (単位: 秒)
ローカルパンダ (V1.3.5)
176
MaxFrame
85
結論
MaxFrameはPandasインターフェイスとシームレスに統合され、自動分散処理を可能にします。 堅牢なデータ処理機能を維持しながら、データ分析と計算の規模と効率を大幅に向上させます。