本文介紹MaxFrame提供的包括Session、Input/Output、Execute及Fetch幾類特有API,用於在MaxFrame任務中便捷的進行資料處理。
Session
new_session
介面名稱:new_session,源碼請參見new_session。
new_session( session_id: str = None, default: bool = True, new: bool = True, odps_entry: Optional[ODPS] = None )功能描述:啟動一個MaxFrame任務會話。
入參
參數名稱
參數類型
是否必填
描述
session_id
String
否
工作階段識別項。
用於為新建立的會話指定一個唯一的標識符。若不指定,則MaxFrame會自動產生一個預設的標識符。
default
Boolean
否
是否將建立的Session作為預設Session。
預設值為True。
new
Boolean
否
是否建立新會話。
預設值為True,若設定為False,則根據session_id重用現有會話。
odps_entry
ODPS
是
ODPS入口對象。詳情請參見初始化ODPS入口。
傳回值
Session對象。
樣本
from maxframe import new_session from odps import ODPS # 使用MaxFrame相關帳號初始化ODPS o = ODPS( # 確保 ALIBABA_CLOUD_ACCESS_KEY_ID 環境變數設定為使用者 Access Key ID, # ALIBABA_CLOUD_ACCESS_KEY_SECRET 環境變數設定為使用者 Access Key Secret, # 不建議直接使用AccessKey ID和 AccessKey Secret字串。 os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'), os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'), project='your-default-project', endpoint='your-end-point', ) # 初始化MaxFrame會話 session = new_session(odps_entry=o)
Input/Output
read_odps_table
介面名稱:read_odps_table,源碼請參見read_odps_table。
read_odps_table( table_name: Union[str, Table], partitions: Union[None, str, List[str]] = None, columns: Optional[List[str]] = None, index_col: Union[None, str, List[str]] = None, odps_entry: ODPS = None, string_as_binary: bool = None, append_partitions: bool = False )功能描述:從MaxCompute表中讀取資料並構建DataFrame。支援指定部分列作為索引,如果不指定,則會產生RangeIndex。
入參
參數名稱
參數類型
是否必填
描述
table_name
String/Table
是
待讀取的MaxCompute表名或Table對象。
partitions
String/List
否
待讀取的表分區或分區列表。
格式為:
<partition_name>=<partition_value>。若不指定,則讀取表中的所有分區資料。columns
List
否
待讀取的表列名List。
格式為:
<column1>, <column2>, ...。若不指定,則讀取表中的所有列(不包含分區列)資料。index_col
String/List
否
指定作為索引的列名或列名List。
odps_entry
ODPS
否
ODPS入口對象。詳情請參見初始化ODPS入口。
string_as_binary
Boolean
否
是否以二進位形式讀取字串資料。
append_partitions
Boolean
否
是否讀取分區列的資料。
預設值為False。若設定為True,在未指定
columns參數時,將讀取所有列(包含分區列)的資料。傳回值
DataFrame對象。
樣本
import maxframe.dataframe as md df = md.read_odps_table('BIGDATA_PUBLIC_DATASET.data_science.maxframe_ml_100k_users', index_col='user_id', columns=['age', 'sex']) print(df.execute().fetch()) # 傳回值 user_id age sex 1 24 M 2 53 F 3 23 M 4 24 M 5 33 F ... ... ... 939 26 F 940 32 M 941 20 M 942 48 F 943 22 M
read_odps_query
介面名稱:read_odps_query,源碼請參見read_odps_query。
read_odps_query( query: str, odps_entry: ODPS = None, index_col: Union[None, str, List[str]] = None, string_as_binary: bool = None )功能描述:從MaxCompute查詢中讀取資料並構建DataFrame。支援指定部分列作為索引,如果不指定,則會產生RangeIndex。
入參
參數名稱
參數類型
是否必填
描述
query
String
是
待讀取的MaxCompute SQL語句。
odps_entry
ODPS
否
ODPS入口對象。詳情請參見初始化ODPS入口。
index_col
String/List
否
指定作為索引的列名或列名List。
string_as_binary
Boolean
否
是否以二進位形式讀取字串資料。
傳回值
DataFrame對象。
樣本
import maxframe.dataframe as md df = md.read_odps_query('select user_id, age, sex FROM `BIGDATA_PUBLIC_DATASET.data_science.maxframe_ml_100k_users`')
to_odps_table
介面名稱:to_odps_table,源碼請參見to_odps_table。
to_odps_table( table: Union[Table, str], partition: Optional[str] = None, partition_col: Union[None, str, List[str]] = None, overwrite: bool = False, unknown_as_string: Optional[bool] = None, index: bool = True, index_label: Union[None, str, List[str]] = None, lifecycle: Optional[int] = None )功能描述:將DataFrame對象寫入MaxCompute表。若MaxCompute中表不存在,則會自動建立。
入參
參數名稱
參數類型
是否必填
描述
table
String/Table
是
待寫入DataFrame的表名稱或Table對象。
partition
String
否
待寫入的分區。
例如
pt1=xxx, pt2=yyy。partition_col
String/List
否
指定DataFrame中作為分區列的列名或列名List。
overwrite
Boolean
否
當表或分區已經存在時,是否覆蓋資料。
預設值為False。
unknown_as_string
Boolean
否
對於無法識別的類型,是否作為String類型進行處理。
預設值為False。如果為True,DataFrame中的物件類型會被視為字串。可能引發錯誤。
index
Boolean
否
是否儲存索引。
預設值為True。
index_label
String/List
否
為索引指定的列名稱。
索引列名稱由index_label參數指定。如果不指定,則使用系統預設名稱(index)。僅具有一級索引時,名稱將預設為index,對於多級索引,名稱將為level_x,其中x是索引的層級。
lifecycle
int
否
指定輸出表的生命週期。
取值為正整數,若表已經存在,則該設定會覆蓋原參數。
傳回值
DataFrame對象。
樣本
import maxframe.dataframe as md df = md.read_odps_query('select user_id, age, sex FROM `BIGDATA_PUBLIC_DATASET.data_science.maxframe_ml_100k_users`', index_col='user_id')) ouput_df = df.to_odps_table('output_table', lifecycle = 7)
to_odps_model
介面名稱:to_odps_model。
to_odps_model( model_name: str, model_version: str = None, schema: str = None, project: str = None, description: Optional[str] = None, version_description: Optional[str] = None, create_model: bool = True, set_default_version: bool = False )功能描述:將MaxFrame作業訓練產生的Xgboost模型儲存成MaxCompute的模型對象。
入參
參數名稱
參數類型
是否必填
描述
model_name
String
是
模型名稱。
當
project和schema已在作業中單獨指定,則僅需指定模型名;否則需要按照project.schema.model格式指定模型名稱。
model_version
String
否
模型版本。
若未指定,則由系統自動產生。
schema
String
否
模型歸屬的schema。
若未指定,則預設schema為“default”。
project
String
否
模型歸屬的專案。
description
String
否
模型描述。
version_description
String
否
模型版本描述。
create_model
Boolean
否
是否在模型不存在時自動建立。
預設值:True。
set_default_version
Boolean
否
是否將目前的版本設為模型的預設版本。
預設值:False。
傳回值
Scalar 對象,調用
.execute()後觸發模型儲存操作。樣本
# 訓練 XGBoost 模型 from maxframe.learn.contrib.xgboost import XGBClassifier X_df = md.DataFrame(X, columns=cols) clf = XGBClassifier(n_estimators=10) clf.fit(X_df, y) # 儲存模型至 MaxCompute clf.to_odps_model( model_name="my_model", # 指定專案和schema,則model_name的樣本格式如下 # model_name="project.schema.my_model" model_version="version1" ).execute()
Execute
execute
介面名稱:execute,源碼請參見execute
execute( session: SessionType = None )功能描述:執行execute方法,以啟動資料處理任務。
入參
參數名稱
參數類型
是否必填
描述
session
Session
否
用於執行資料處理任務的會話。建立方式請參見new_session。
若不指定,則使用通過new_session初始化的全域會話。
傳回值
無。
樣本
import maxframe.dataframe as md df = md.read_odps_query('select user_id, age, sex FROM BIGDATA_PUBLIC_DATASET.data_science.maxframe_ml_100k_users', index_col='user_id')) df.execute()
Fetch
fetch
介面名稱:fetch,源碼請參見fetch。
fetch( session: SessionType = None )功能描述:擷取結果資料到本地環境。
入參
參數名稱
參數類型
是否必填
描述
session
Session
否
用於擷取結果資料的會話。建立方式請參見new_session。
若不指定,則使用通過new_session初始化的全域會話。
傳回值
Pandas的DataFrame或Series。
樣本
import maxframe.dataframe as md df = md.read_odps_query('select user_id, age, sex FROM `BIGDATA_PUBLIC_DATASET.data_science.maxframe_ml_100k_users`', index_col='user_id') res = df.execute().fetch() print(res) # 返回結果 user_id age sex 1 24 M 2 53 F 3 23 M 4 24 M 5 33 F ... ... .. 939 26 F 940 32 M 941 20 M 942 48 F 943 22 M [943 rows x 2 columns]