MaxFrame は、データ処理のための Pandas と互換性のある API を提供します。これらの API には、フィルタリング、射影、連結、集約のための基本的な API と、ユーザー定義関数 (UDF) を呼び出すための transform や apply などの高度な API が含まれます。高度な API は、特定のビジネスロジックやデータ操作を実装するために使用でき、標準のオペレーターが使用できない複雑なシナリオに適しています。MaxFrame は、ビッグデータ処理の要件を満たすための特定の API を提供します。たとえば、read_odps_table API を使用して MaxCompute テーブルからデータを読み取り、to_odps_table API を使用して MaxCompute テーブルにデータを書き込み、execute API を使用して遅延コンピューティングを実行できます。これらの API を使用して、オンプレミスの計算リソースに制限されることなく、ビッグデータ環境のデータを効率的に分析できます。
データ準備
このトピックでは、MaxCompute のパブリックデータセットの maxframe_ml_100k_users テーブルを使用して、MaxFrame の使用方法を説明します。テストデータは、パブリック MaxCompute プロジェクト BIGDATA_PUBLIC_DATASET の data_science スキーマに保存されています。このデータは直接使用できます。
セッションの初期化
MaxFrame ジョブを実行する前に、MaxFrame セッションを初期化する必要があります。コードのエントリーポイントで、new_session API 操作を呼び出してジョブを初期化できます。その後のすべてのデータ処理は、作成されたセッションオブジェクトを介して実行され、バックエンドサービスと対話します。次のコードに例を示します。
import os
from maxframe import new_session
from odps import ODPS
# MaxFrame アカウントを使用して MaxCompute を初期化します。
o = ODPS(
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
project='your-default-project',
endpoint='your-end-point',
)
# MaxFrame セッションを初期化します。
new_session(odps_entry=o)new_session API 操作の詳細については、「new_session」をご参照ください。
DataFrame オブジェクトの作成
read_odps_table および read_odps_query API 操作を使用すると、MaxCompute テーブルから DataFrame オブジェクトを作成できます。これらの DataFrame オブジェクトは、Pandas スタイルのデータ操作をサポートします。MaxFrame は、ローカルデータを使用した DataFrame オブジェクトの初期化もサポートしています。この機能は、コードを迅速にテストおよび開発するのに役立ちます。
MaxCompute テーブルの使用
read_odps_tableAPI 操作を使用して、次の操作を実行できます。read_odps_table関数を使用して MaxCompute テーブルからデータを読み取り、それを MaxFrame DataFrame オブジェクトに変換できます。import maxframe.dataframe as md # MaxCompute テーブルからデータを読み取り、DataFrame オブジェクトを作成します。 df = md.read_odps_table('BIGDATA_PUBLIC_DATASET.data_science.maxframe_ml_100k_users')index_col パラメーターを設定して、テーブル内の列を DataFrame オブジェクトのインデックスとして指定します。
import maxframe.dataframe as md # データベースの user_id 列を DataFrame オブジェクトのインデックスとして使用します。 df = md.read_odps_table('BIGDATA_PUBLIC_DATASET.data_science.maxframe_ml_100k_users', index_col="user_id")columns パラメーターを設定して、テーブル内の列を指定して DataFrame オブジェクトを作成します。
import maxframe.dataframe as md # テーブル内の列を選択して DataFrame オブジェクトを作成します。 df = md.read_odps_table('BIGDATA_PUBLIC_DATASET.data_science.maxframe_ml_100k_users', columns=['user_id', 'age', 'sex'])read_odps_tableAPI の詳細については、「read_odps_table」をご参照ください。
MaxCompute SQL のクエリ結果の使用
テーブルから DataFrame を作成することに加えて、MaxFrame では
read_odps_queryAPI 操作で SQL クエリを実行し、その結果を DataFrame のデータ入力として使用できます。import maxframe.dataframe as md df = md.read_odps_query('select user_id, age, sex FROM `BIGDATA_PUBLIC_DATASET.data_science.maxframe_ml_100k_users`')index_col パラメーターを使用して、DataFrame オブジェクトのインデックスを指定します。
import maxframe.dataframe as md df = md.read_odps_query('select user_id, age, sex FROM `BIGDATA_PUBLIC_DATASET.data_science.maxframe_ml_100k_users`', index_col='user_id')read_odps_queryAPI の詳細については、「read_odps_query」をご参照ください。オンプレミスデータの使用
MaxFrame は Pandas と同様の使用体験を提供します。オンプレミスデータに基づいて MaxFrame DataFrame オブジェクトを直接作成できます。サンプルコード:
import maxframe.dataframe as md d = {'col1': [1, 2], 'col2': [3, 4]} df = md.DataFrame(data=d) print(df.execute().fetch()) # 返された結果を取得します。 col1 col2 0 1 3 1 2 4 df = md.DataFrame(data=d, dtype=np.int8) print(df.execute().fetch()) # 返された結果を取得します。 col1 col2 0 1 3 1 2 4 d = {'col1': [0, 1, 2, 3], 'col2': pd.Series([2, 3], index=[2, 3])} df = md.DataFrame(data=d, index=[0, 1, 2, 3]) print(df.execute().fetch()) # 返された結果を取得します。 col1 col2 0 0 NaN 1 1 NaN 2 2 2.0 3 3 3.0 data = np.array([(1, 2, 3), (4, 5, 6), (7, 8, 9)], dtype=[("a", "i4"), ("b", "i4"), ("c", "i4")]) df = md.DataFrame(data, columns=['c', 'a']) df.execute().fetch() # 返された結果を取得します。 c a 0 3 1 1 6 4 2 9 7詳細については、「DataFrame」をご参照ください。
データ処理
MaxFrame は、Pandas と互換性のある一連の API を提供します。これらの API を使用して、データ計算、射影、フィルタリング、ソートなどのさまざまな操作を実行できます。MaxFrame は、一般的なデータ処理のための幅広いオペレーターもサポートしています。MaxFrame は、データ処理と分析の要件を満たすために UDF もサポートしています。
算術演算
加算、減算、乗算、除算などのさまざまな算術演算を DataFrame オブジェクトで直接実行できます。次の例は、MaxFrame を使用して基本的な算術演算を実行する方法を示しています。
例 1: 単純なデータ加算を実行します。
import maxframe.dataframe as md df = md.DataFrame({'angles': [0, 3, 4], 'degrees': [360, 180, 360]}, index=['circle', 'triangle', 'rectangle']) print(df.execute().fetch()) # 返された結果を取得します。 angles degrees circle 0 360 triangle 3 180 rectangle 4 360 df = df + 1 print(df.execute().fetch()) # 返された結果を取得します。 angles degrees circle 1 361 triangle 4 181 rectangle 5 361例 2: DataFrame オブジェクト間で乗算を実行します。
import maxframe.dataframe as md df = md.DataFrame({'angles': [0, 3, 4], 'degrees': [360, 180, 360]}, index=['circle', 'triangle', 'rectangle']) other = md.DataFrame({'angles': [0, 3, 4]}, index=['circle', 'triangle', 'rectangle']) print(df.mul(other, fill_value=0).execute()) # 返された結果を取得します。 angles degrees circle 0 0.0 triangle 9 0.0 rectangle 16 0.0
フィルタリング、射影、サンプリング
フィルタリング
フィルタリングを使用すると、特定の基準に基づいて DataFrame オブジェクトからデータを選択または除外できます。この操作は、大規模なデータセットの処理と分析に不可欠であり、最も関連性の高い情報に集中するのに役立ちます。
例 1: データの最初の数行を表示します。
import maxframe.dataframe as md df = md.DataFrame({'animal': ['alligator', 'bee', 'falcon', 'lion', 'monkey', 'parrot', 'shark', 'whale', 'zebra']}) print(df.head().execute().fetch()) # 返された結果を取得します。 animal 0 alligator 1 bee 2 falcon 3 lion 4 monkey例 2: 指定した列を削除します。
import maxframe.dataframe as md df = md.DataFrame(np.arange(12).reshape(3, 4), columns=['A', 'B', 'C', 'D']) print(df.drop(['B', 'C'], axis=1).execute().fetch()) # 返された結果を取得します。 A D 0 0 3 1 4 7 2 8 11
射影
射影を使用すると、表示する列を調整したり、列の順序を再配置したりするなど、DataFrame オブジェクトの構造を再形成できます。射影操作を実行して、データの簡略化されたビューを作成したり、特定の分析要件を満たすようにデータの表示方法を調整したりできます。
例: 列名を変更します。
import maxframe.dataframe as md
df = md.DataFrame({"A": [1, 2, 3], "B": [4, 5, 6]})
print(df.set_axis(['I', 'II'], axis='columns').execute().fetch())
# 返された結果を取得します。
I II
0 1 4
1 2 5
2 3 6 サンプリング
サンプリングは、DataFrame オブジェクトからランダムなサンプルを選択するプロセスです。サンプリングは、大規模なデータセットを処理し、データの統計的特性を推定するために不可欠です。
例: ランダムサンプリングを実行します。
import maxframe.dataframe as md
df = md.DataFrame({'num_legs': [2, 4, 8, 0],
'num_wings': [2, 0, 0, 0],
'num_specimen_seen': [10, 2, 1, 8]},
index=['falcon', 'dog', 'spider', 'fish'])
print(df['num_legs'].sample(n=3, random_state=1).execute())
# 返された結果を取得します。
falcon 2
fish 0
dog 4
Name: num_legs, dtype: int64フィルタリング、射影、サンプリング操作の詳細については、「インデックス再作成/選択/ラベル操作」をご参照ください。
ソート
ソートを使用すると、1 つ以上の列の値に基づいて DataFrame オブジェクト内の行の順序を再配置できます。ほとんどの場合、ソートはデータ分析で使用されます。ソート操作を実行して、データのパターン、傾向、例外を簡単に観察できます。
例: 1 つ以上の列の値をソートします。
import maxframe.dataframe as md
import numpy as np
df = md.DataFrame({
'col1': ['A', 'A', 'B', np.nan, 'D', 'C'],
'col2': [2, 1, 9, 8, 7, 4],
'col3': [0, 1, 9, 4, 2, 3],
})
res = df.sort_values(by=['col1']).execute()
print(res.fetch())
# 返された結果を取得します。
col1 col2 col3
0 A 2 0
1 A 1 1
2 B 9 9
5 C 4 3
4 D 7 2
3 None 8 4
res = df.sort_values(by=['col1', 'col2']).execute()
print(res.fetch())
# 返された結果を取得します。
col1 col2 col3
1 A 1 1
0 A 2 0
2 B 9 9
5 C 4 3
4 D 7 2
3 None 8 4 ソート操作の詳細については、「再形成/ソート/転置」をご参照ください。
結合、マージ、連結
結合、マージ、連結は、基本的かつ強力なデータ処理操作です。これらの操作により、特定のパブリックフィールドまたはインデックスに基づいて、さまざまなデータセットを水平または垂直に組み合わせることができます。MaxFrame は、データセットを簡単に結合するのに役立つ、結合、マージ、連結に関連する API を提供します。
例: データを水平に結合します。
import maxframe.dataframe as md
df = md.DataFrame({'key': ['K0', 'K1', 'K2', 'K3', 'K4', 'K5'],
'A': ['A0', 'A1', 'A2', 'A3', 'A4', 'A5']})
other = md.DataFrame({'key': ['K0', 'K1', 'K2'],
'B': ['B0', 'B1', 'B2']})
print(df.join(other, lsuffix='_caller', rsuffix='_other').execute().fetch())
# 返された結果を取得します。
key_caller A key_other B
0 K0 A0 K0 B0
1 K1 A1 K1 B1
2 K2 A2 K2 B2
3 K3 A3 None None
4 K4 A4 None None
5 K5 A5 None None結合、マージ、連結操作の詳細については、「結合/マージ/連結」をご参照ください。
集約と UDF
集約
集約は、値のグループを単一の値に変換するプロセスです。集約操作を実行して、データ分析中にデータの統計的特性を要約および発見できます。
例: 複数タイプの集約を実行します。
import maxframe.dataframe as md
df = md.DataFrame([[1, 2, 3],
[4, 5, 6],
[7, 8, 9],
[np.nan, np.nan, np.nan]],
columns=['A', 'B', 'C'])
print(df.agg(['sum', 'min']).execute().fetch())
# 返された結果を取得します。
A B C
min 1.0 2.0 3.0
sum 12.0 15.0 18.0
df.agg({'A' : ['sum', 'min'], 'B' : ['min', 'max']}).execute().fetch()
# 返された結果を取得します。
A B
max NaN 8.0
min 1.0 2.0
sum 12.0 NaNUDF
標準のデータ処理オペレーターに加えて、MaxFrame は UDF をサポートし、DataFrame オブジェクトで UDF 処理を実行します。これにより、データ処理の柔軟性が向上し、カスタムロジックを使用してデータセットでより多くの操作を実行できます。
UDF を実行する前に、`new_session` を呼び出す前に config.options.sql.settings パラメーターを使用して共通イメージを宣言する必要があります。
例 1: transform メソッドを使用して UDF を呼び出します。
import maxframe.dataframe as md from maxframe import config config.options.sql.settings = { "odps.session.image": "common", "odps.sql.type.system.odps2": "true" } session = new_session(o) df = md.DataFrame({'A': range(3), 'B': range(1, 4)}) print(df.transform(lambda x: x + 1).execute().fetch()) # 返された結果を取得します。 A B 0 1 2 1 2 3 2 3 4例 2: apply メソッドを使用して UDF を呼び出します。
UDF の実行前後で列数を変更したい場合は、apply メソッドを使用して UDF を呼び出すことができます。
import maxframe.dataframe as md import numpy as np from maxframe import config config.options.sql.settings = { "odps.session.image": "common", "odps.sql.type.system.odps2": "true" } session = new_session(o) def simple(row): row['is_man'] = row['sex'] == "man" return row df = md.read_odps_table('BIGDATA_PUBLIC_DATASET.data_science.maxframe_ml_100k_users') new_dtypes = df.dtypes.copy() new_dtypes["is_man"] = np.dtype(np.bool_) df.apply( simple, axis=1, result_type="expand", output_type="dataframe", dtypes=new_dtypes ).execute().fetch() # 返された結果を取得します。 user_id age sex occupation zip_code is_man 0 1 24 M technician 85711 False 1 2 53 F other 94043 False 2 3 23 M writer 32067 False 3 4 24 M technician 43537 False 4 5 33 F other 15213 False .. ... ... .. ... ... ... 938 939 26 F student 33319 False 939 940 32 M administrator 02215 False 940 941 20 M student 97229 False 941 942 48 F librarian 78209 False 942 943 22 M student 77841 False [943 rows x 6 columns]
集約と UDF 操作、および集計関数の選択方法の詳細については、「関数適用/GroupBy/ウィンドウ」をご参照ください。
結果の保存
データセットが変換された後、to_odps_table API 操作を使用して結果を MaxCompute テーブルに保存できます。
例 1: 処理されたデータを MaxCompute テーブルに書き込みます。
# 処理されたデータを MaxCompute テーブルに書き込みます。 filtered_df.to_odps_table('<table_name>')例 2: ストレージ期間を指定します。
lifecycle パラメーターを設定して、結果テーブルのライフサイクルを指定できます。
# 処理されたデータを MaxCompute テーブルに書き込みます。 filtered_df.to_odps_table('<table_name>', lifecycle = 7)
パラメーター
table_name: 宛先の MaxCompute テーブルの名前。テーブルが MaxCompute に存在しない場合、テーブルは自動的に作成されます。テーブルが既に存在する場合、データはデフォルトで追加されます。
overwrite パラメーターを使用して、既存のデータを上書きするかどうかを指定できます。ストレージ操作の詳細については、「to_odps_table」をご参照ください。
タスクの実行と実行結果の表示
execute() メソッドを使用してデータ処理タスクをトリガーし、fetch() メソッドを使用して結果データの一部を取得できます。
例: 実行結果を取得して表示します。
execute() と fetch() メソッドを追加してデータ処理フローを完了し、結果を表示できます。Pandas と比較して、MaxFrame はその遅延評価モデルを使用して、大規模なデータセットを効率的に処理し、不要なデータ転送を削減します。
# 実行結果の部分的なデータを取得します。
data = filtered_df.execute().fetch()
print(data)