全部產品
Search
文件中心

MaxCompute:情境實踐

更新時間:Aug 30, 2024

MaxCompute提供使用者自訂函數(UDF)及Python(PyODPS和MaxFrame)開發能力,本文為您介紹如何在MaxCompute UDF、PyODPS及MaxFrame作業開發中使用鏡像。

在SQL UDF開發中使用鏡像

以下以使用Pandas實現一個列求和的UDF為例,為您介紹如何在SQL UDF開發中使用鏡像。

  1. 編寫Python UDF指令碼,並將其儲存為sum_pandas.py檔案。指令碼樣本如下:

    from odps.udf import annotate
    import pandas as pd
    
    @annotate("string, string -> string")
    class SumColumns(object):
        def evaluate(self, arg1, arg2):
            # 將輸入參數轉換為pandas DataFrame
            df = pd.DataFrame({'col1': arg1.split(','), 'col2': arg2.split(',')})
    
            # 使用pandas進行資料處理操作
            # 這裡以計算兩列的和為例
            df['sum'] = df['col1'].astype(int) + df['col2'].astype(int)
    
            # 將處理結果轉換為字串並返回
            result = ','.join(df['sum'].astype(str).values)
            return result
  2. sum_pandas.py指令碼以資源形式上傳至MaxCompute專案空間,詳情請參見添加資源。命令樣本如下:

    ADD PY sum_pandas.py -f;
  3. 將已上傳的sum_pandas.py指令碼註冊為自訂函數SumColumns,具體操作請參見註冊函數。命令樣本如下:

    CREATE FUNCTION SumColumns AS 'sum_pandas.SumColumns' USING 'sum_pandas.py';
  4. 準備測試表testsum及測試資料。

    CREATE TABLE testsum (col1 string, col2 string);
    INSERT INTO testsum VALUES ('1,2,3','1,2,3'),('1,2,3','3,2,1'),('1,2,3','4,5,6');
  5. 調用UDF函數時通過Flag指定已存在的鏡像。

    set odps.sql.python.version=cp37;
    set odps.session.image = <鏡像名稱>;
    SELECT SumColumns(col1,col2) AS result FROM testsum;

    返回結果:

    +------------+
    | result     |
    +------------+
    | 2,4,6      |
    | 4,4,4      |
    | 5,7,9      |
    +------------+

在PyODPS開發中使用鏡像

以下以實現scipy包中的psi函數為例,為您介紹如何在PyODPS中使用鏡像。

  1. 準備測試表test_float_col及測試資料。

    CREATE TABLE test_float_col (col1 double);
    INSERT INTO test_float_col VALUES (3.75),(2.51);
  2. 編寫PyODPS代碼,計算psi(col1)的值,並儲存為psi_col.py檔案執行。程式碼範例如下:

    import os
    from odps import ODPS, options
    
    def my_psi(v):
        from scipy.special import psi
    
        return float(psi(v))
    
    # 如果 Project 開啟了 Isolation,下面的選項不是必需的
    options.sql.settings = {"odps.isolation.session.enable": True}
    
    o = ODPS(
          # 確保 ALIBABA_CLOUD_ACCESS_KEY_ID 環境變數設定為使用者 Access Key ID,
          # ALIBABA_CLOUD_ACCESS_KEY_SECRET 環境變數設定為使用者 Access Key Secret,
          # 不建議直接使用AccessKey ID和 AccessKey Secret字串。
          os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
          os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
          project='your-default-project',
          endpoint='your-end-point'
    )
    
    df = o.get_table("test_float_col").to_df()
    # 直接執行並取得結果
    df.col1.map(my_psi).execute(image='scipy')
    # 儲存到另一張表
    df.col1.map(my_psi).persist("result_table", image='scipy')

    參數說明:

    • ALIBABA_CLOUD_ACCESS_KEY_ID:需將該環境變數設定為具備目標MaxCompute專案中待操作對象相關MaxCompute許可權的AccessKey ID。您可以進入AccessKey管理頁面擷取AccessKey ID。

    • ALIBABA_CLOUD_ACCESS_KEY_SECRET:需將該環境變數設定為AccessKey ID對應的AccessKey Secret。

    • your-default-project:使用的MaxCompute專案名稱。您可以登入MaxCompute控制台,在左側導覽列選擇工作區>專案管理,查看MaxCompute專案名稱。

    • your-end-point:目標MaxCompute專案所在地區的Endpoint,可根據網路連接方式自行選擇,例如http://service.cn-chengdu.maxcompute.aliyun.com/api。詳情請參見Endpoint

  3. 查看結果表result_table。

    SELECT * FROM result_table

    返回結果:

    +------------+
    | col1       |
    +------------+
    | 1.1825373886117962 |
    | 0.7080484451910534 |
    +------------+

在MaxFrame開發中使用鏡像

以下以實現scipy包中的psi函數為例,為您介紹如何在MaxFrame作業中使用鏡像。

  1. 準備測試表test_float_col及測試資料。

    CREATE TABLE test_float_col (col1 double);
    INSERT INTO test_float_col VALUES (3.75),(2.51);
  2. 編寫MaxFrame代碼,計算psi(col1)的值,並儲存為psi_col.py檔案執行。程式碼範例如下:

    import os
    from odps import ODPS, options
    from maxframe.session import new_session
    import maxframe.dataframe as md
    
    
    from maxframe.config import options
    from maxframe import config
    
    # 引用內建scipy鏡像
    config.options.sql.settings = {
        "odps.session.image": "scipy"
    }
    def my_psi(v):
        from scipy.special import psi
        return float(psi(v))
    
    o = ODPS(
          # 確保 ALIBABA_CLOUD_ACCESS_KEY_ID 環境變數設定為使用者 Access Key ID,
          # ALIBABA_CLOUD_ACCESS_KEY_SECRET 環境變數設定為使用者 Access Key Secret,
          # 不建議直接使用AccessKey ID和 AccessKey Secret字串。
          os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
          os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
          project='your-default-project',
          endpoint='your-end-point'
    )
    
    # 建立MaxFrame Session
    session = new_session(o)
    df = md.read_odps_table('test_float_col')
    
    # 直接執行並取得結果
    print(df.col1.map(my_psi).execute().fetch())

    參數說明:

    • ALIBABA_CLOUD_ACCESS_KEY_ID:需將該環境變數設定為具備目標MaxCompute專案中待操作對象相關MaxCompute許可權的AccessKey ID。您可以進入AccessKey管理頁面擷取AccessKey ID。

    • ALIBABA_CLOUD_ACCESS_KEY_SECRET:需將該環境變數設定為AccessKey ID對應的AccessKey Secret。

    • your-default-project:使用的MaxCompute專案名稱。您可以登入MaxCompute控制台,在左側導覽列選擇工作區>專案管理,查看MaxCompute專案名稱。

    • your-end-point:目標MaxCompute專案所在地區的Endpoint,可根據網路連接方式自行選擇,例如http://service.cn-chengdu.maxcompute.aliyun.com/api。詳情請參見Endpoint

    返回結果:

    0    1.182537
    1    0.708048
    Name: col1, dtype: float64