MaxFrame為您提供相容Pandas的API介面,用於資料處理。其中包括篩選、投影、拼接和彙總等基本API,及用於調用自訂函數的進階API(如transform、apply),進階API可以實現特定商務邏輯和資料操作,從而解決標準運算元可能無法覆蓋複雜情境的問題。同時MaxFrame結合巨量資料的處理需求,引入了特有的API介面,如讀寫MaxCompute表格式資料(read_odps_table、to_odps_table)、執行延遲計算(execute)等,讓您可以更高效地在巨量資料環境下進行資料分析,不受本地計算資源的限制。
資料準備
本文以MaxCompute公用資料集中的maxframe_ml_100k_users表為例,為您介紹MaxFrame的使用。測試資料已存放在MaxCompute公開專案BIGDATA_PUBLIC_DATASET下的data_science Schema中,您可直接使用。
初始化會話
執行MaxFrame作業前,首先需要初始化MaxFrame會話(Session)。在代碼的入口處,通過調用new_session 介面初始化整個作業。後續的資料處理將藉助所構建的會話對象與後端服務進行互動,以執行各種資料操作。樣本如下:
import os
from maxframe import new_session
from odps import ODPS
# 使用MaxFrame相關帳號初始化ODPS
o = ODPS(
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
project='your-default-project',
endpoint='your-end-point',
)
# 初始化MaxFrame會話
new_session(odps_entry=o)更多關於new_session介面的使用細節,請參見new_session。
構建DataFrame對象
read_odps_table和read_odps_query介面允許您基於MaxCompute表建立DataFrame對象,且這些DataFrame對象支援Pandas式的資料操作。MaxFrame還支援使用本機資料初始化DataFrame對象,便於您快速進行測試和開發。
使用MaxCompute表
您可通過
read_odps_table介面執行以下操作:通過
read_odps_table介面讀取MaxCompute表資料,並將其轉換為MaxFrame DataFrame對象。import maxframe.dataframe as md # 從MaxCompute表讀取資料,建立DataFrame df = md.read_odps_table('BIGDATA_PUBLIC_DATASET.data_science.maxframe_ml_100k_users')通過指定index_col參數,選擇表中的某列作為DataFrame的索引。
import maxframe.dataframe as md # 使用資料庫的id列作為DataFrame的index df = md.read_odps_table('BIGDATA_PUBLIC_DATASET.data_science.maxframe_ml_100k_users', index_col="user_id")通過指定columns參數,選擇表中的部分列構建DataFrame對象。
import maxframe.dataframe as md # 擇表中的部分列構建DataFrame對象 df = md.read_odps_table('BIGDATA_PUBLIC_DATASET.data_science.maxframe_ml_100k_users', columns=['user_id', 'age', 'sex'])更多關於
read_odps_tableAPI的使用細節,請參見read_odps_table。
使用MaxCompute SQL查詢結果
除了直接使用表構建DataFrame,MaxFrame還支援通過
read_odps_query介面執行SQL查詢,並以查詢結果作為DataFrame的資料輸入。import maxframe.dataframe as md df = md.read_odps_query('select user_id, age, sex FROM `BIGDATA_PUBLIC_DATASET.data_science.maxframe_ml_100k_users`')使用index_col參數指定DataFrame的索引:
import maxframe.dataframe as md df = md.read_odps_query('select user_id, age, sex FROM `BIGDATA_PUBLIC_DATASET.data_science.maxframe_ml_100k_users`', index_col='user_id')更多關於
read_odps_queryAPI的使用細節,請參見read_odps_query。使用本地定義的資料
MaxFrame支援類似於Pandas的使用體驗,可以基於本機資料直接構建MaxFrame DataFrame對象。樣本如下:
import maxframe.dataframe as md d = {'col1': [1, 2], 'col2': [3, 4]} df = md.DataFrame(data=d) print(df.execute().fetch()) # 返回結果 col1 col2 0 1 3 1 2 4 df = md.DataFrame(data=d, dtype=np.int8) print(df.execute().fetch()) # 返回結果 col1 col2 0 1 3 1 2 4 d = {'col1': [0, 1, 2, 3], 'col2': pd.Series([2, 3], index=[2, 3])} df = md.DataFrame(data=d, index=[0, 1, 2, 3]) print(df.execute().fetch()) # 返回結果 col1 col2 0 0 NaN 1 1 NaN 2 2 2.0 3 3 3.0 data = np.array([(1, 2, 3), (4, 5, 6), (7, 8, 9)], dtype=[("a", "i4"), ("b", "i4"), ("c", "i4")]) df = md.DataFrame(data, columns=['c', 'a']) df.execute().fetch() # 返回結果 c a 0 3 1 1 6 4 2 9 7更多使用細節,請參見DataFrame。
資料處理
MaxFrame提供了一系列相容Pandas的API介面,覆蓋了資料計算、投影、過濾及排序等多種操作,且支援豐富的運算元用於常規資料處理。除此之外,MaxFrame還支援UDF,以滿足使用者使用自訂函數進行資料加工和分析的需求。
數學計算
各種數學計算操作能夠直接在DataFrame上進行,包括加法、減法、乘法及除法等。下述樣本為您展示如何使用MaxFrame進行基本的算術操作。
樣本1:簡單的資料加法。
import maxframe.dataframe as md df = md.DataFrame({'angles': [0, 3, 4], 'degrees': [360, 180, 360]}, index=['circle', 'triangle', 'rectangle']) print(df.execute().fetch()) # 返回結果 angles degrees circle 0 360 triangle 3 180 rectangle 4 360 df = df + 1 print(df.execute().fetch()) # 返回結果 angles degrees circle 1 361 triangle 4 181 rectangle 5 361樣本2:DataFrame之間的乘法。
import maxframe.dataframe as md df = md.DataFrame({'angles': [0, 3, 4], 'degrees': [360, 180, 360]}, index=['circle', 'triangle', 'rectangle']) other = md.DataFrame({'angles': [0, 3, 4]}, index=['circle', 'triangle', 'rectangle']) print(df.mul(other, fill_value=0).execute()) # 返回結果 angles degrees circle 0 0.0 triangle 9 0.0 rectangle 16 0.0
更多關於數學運算介面的資訊,請參見Binary operator functions和Computations / descriptive stats。
過濾/投影/抽樣
過濾
過濾操作允許您根據特定的條件從DataFrame中選擇或排除資料。該操作對於處理和分析大型資料集至關重要,允許您集中關注其中最具相關性的資訊。
樣本1:顯示前幾行資料。
import maxframe.dataframe as md df = md.DataFrame({'animal': ['alligator', 'bee', 'falcon', 'lion', 'monkey', 'parrot', 'shark', 'whale', 'zebra']}) print(df.head().execute().fetch()) # 返回結果 animal 0 alligator 1 bee 2 falcon 3 lion 4 monkey樣本2:刪除指定的列。
import maxframe.dataframe as md df = md.DataFrame(np.arange(12).reshape(3, 4), columns=['A', 'B', 'C', 'D']) print(df.drop(['B', 'C'], axis=1).execute().fetch()) # 返回結果 A D 0 0 3 1 4 7 2 8 11
投影
您可以通過投影重塑DataFrame的結構,包括要展示的列或者重新安排列的順序。通過投影操作,可以建立資料的簡化視圖或者調整資料的展示方式,以滿足特定的分析目的。
樣本:修改列名
import maxframe.dataframe as md
df = md.DataFrame({"A": [1, 2, 3], "B": [4, 5, 6]})
print(df.set_axis(['I', 'II'], axis='columns').execute().fetch())
# 返回結果
I II
0 1 4
1 2 5
2 3 6 抽樣
抽樣是一種選擇DataFrame中隨機樣本的過程,對於處理大型資料集以及估計資料的統計屬性尤為重要。
樣本:隨機抽樣
import maxframe.dataframe as md
df = md.DataFrame({'num_legs': [2, 4, 8, 0],
'num_wings': [2, 0, 0, 0],
'num_specimen_seen': [10, 2, 1, 8]},
index=['falcon', 'dog', 'spider', 'fish'])
print(df['num_legs'].sample(n=3, random_state=1).execute())
# 返回結果
falcon 2
fish 0
dog 4
Name: num_legs, dtype: int64更多關於過濾、投影和抽樣的資訊請參見Reindexing / selection / label manipulation。
排序
排序允許您根據一列或多列的值重新排列DataFrame中行的順序,常用於資料分析。通過排序,可以更容易地觀察到資料的模式、趨勢和異常點。
樣本:按照單列或多列排序
import maxframe.dataframe as md
import numpy as np
df = md.DataFrame({
'col1': ['A', 'A', 'B', np.nan, 'D', 'C'],
'col2': [2, 1, 9, 8, 7, 4],
'col3': [0, 1, 9, 4, 2, 3],
})
res = df.sort_values(by=['col1']).execute()
print(res.fetch())
# 返回結果
col1 col2 col3
0 A 2 0
1 A 1 1
2 B 9 9
5 C 4 3
4 D 7 2
3 None 8 4
res = df.sort_values(by=['col1', 'col2']).execute()
print(res.fetch())
# 返回結果
col1 col2 col3
1 A 1 1
0 A 2 0
2 B 9 9
5 C 4 3
4 D 7 2
3 None 8 4 更多關於排序操作的細節,請參見Reshaping / sorting / transposing。
拼接
拼接操作是資料處理中非常基本且強大的工具,允許您將不同的資料集基於某些公用欄位或索引,水平或垂直地結合起來。MaxFrame提供了拼接介面,協助您輕鬆完成資料集合拼接操作。
樣本:水平拼接
import maxframe.dataframe as md
df = md.DataFrame({'key': ['K0', 'K1', 'K2', 'K3', 'K4', 'K5'],
'A': ['A0', 'A1', 'A2', 'A3', 'A4', 'A5']})
other = md.DataFrame({'key': ['K0', 'K1', 'K2'],
'B': ['B0', 'B1', 'B2']})
print(df.join(other, lsuffix='_caller', rsuffix='_other').execute().fetch())
# 返回結果
key_caller A key_other B
0 K0 A0 K0 B0
1 K1 A1 K1 B1
2 K2 A2 K2 B2
3 K3 A3 None None
4 K4 A4 None None
5 K5 A5 None None更多關於拼接操作的使用細節和方法,請參見Combining / joining / merging。
彙總/UDF
彙總
彙總操作是將一組值轉換為一個單一值的過程。在資料分析中,彙總是基本的資料歸納工具,用於總結、探索資料的統計特徵。
樣本:多類型彙總
import maxframe.dataframe as md
df = md.DataFrame([[1, 2, 3],
[4, 5, 6],
[7, 8, 9],
[np.nan, np.nan, np.nan]],
columns=['A', 'B', 'C'])
print(df.agg(['sum', 'min']).execute().fetch())
# 返回結果
A B C
min 1.0 2.0 3.0
sum 12.0 15.0 18.0
df.agg({'A' : ['sum', 'min'], 'B' : ['min', 'max']}).execute().fetch()
# 返回結果
A B
max NaN 8.0
min 1.0 2.0
sum 12.0 NaNUDF
除了標準的資料處理運算元外,MaxFrame還支援UDF,以支援在DataFrame對象上執行使用者自訂的函數處理。這增加了資料處理的靈活性,允許使用自訂的邏輯對資料集進行更豐富的操作。
執行UDF前,需要在new_session之前通過config.options.sql.settings參數聲明使用common鏡像。
樣本1:使用transform方法調用自訂函數
import maxframe.dataframe as md from maxframe import config config.options.sql.settings = { "odps.session.image": "common", "odps.sql.type.system.odps2": "true" } session = new_session(o) df = md.DataFrame({'A': range(3), 'B': range(1, 4)}) print(df.transform(lambda x: x + 1).execute().fetch()) # 返回結果 A B 0 1 2 1 2 3 2 3 4樣本2:使用apply方法調用自訂函數
如果您希望UDF執行前後資料的列數發生變化,可以使用apply方法來調用自訂函數。
import maxframe.dataframe as md import numpy as np from maxframe import config config.options.sql.settings = { "odps.session.image": "common", "odps.sql.type.system.odps2": "true" } session = new_session(o) def simple(row): row['is_man'] = row['sex'] == "man" return row df = md.read_odps_table('BIGDATA_PUBLIC_DATASET.data_science.maxframe_ml_100k_users') new_dtypes = df.dtypes.copy() new_dtypes["is_man"] = np.dtype(np.bool_) df.apply( simple, axis=1, result_type="expand", output_type="dataframe", dtypes=new_dtypes ).execute().fetch() # 返回結果 user_id age sex occupation zip_code is_man 0 1 24 M technician 85711 False 1 2 53 F other 94043 False 2 3 23 M writer 32067 False 3 4 24 M technician 43537 False 4 5 33 F other 15213 False .. ... ... .. ... ... ... 938 939 26 F student 33319 False 939 940 32 M administrator 02215 False 940 941 20 M student 97229 False 941 942 48 F librarian 78209 False 942 943 22 M student 77841 False [943 rows x 6 columns]
對於彙總/UDF操作中,如何選擇彙總函式等更多使用細節,請參見Function application / GroupBy / window。
結果儲存
資料集被轉換後,可以使用to_odps_table將結果儲存至MaxCompute表中。
樣本1:將處理後的資料寫入MaxCompute表。
# 將處理後的資料寫入MaxCompute表 filtered_df.to_odps_table('<table_name>')樣本2:指定儲存的周期。
通過指定lifecycle參數,指定結果資料表存活的生命週期。
# 將處理後的資料寫入MaxCompute表 filtered_df.to_odps_table('<table_name>', lifecycle = 7)
參數說明:
table_name:待寫入資料的目標MaxCompute表名。若MaxCompute中表不存在,系統會自動建立。若該表已存在,資料會被預設追加至該表中。
您可通過overwrite參數設定是否覆蓋原有資料。更多關於儲存操作的細節,請參見to_odps_table。
執行任務並查看執行結果
您可以通過execute()方法觸發資料處理任務,並使用fetch()方法擷取部分執行結果資料。
樣本:擷取並展示執行結果。
通過追加execute()和fetch()方法,完成資料處理流程並查看結果。相比於Pandas,MaxFrame允許有效地處理大規模資料集,並通過延遲計算模式來減少不必要的資料轉送。
# 擷取執行結果的部分資料
data = filtered_df.execute().fetch()
print(data)