すべてのプロダクト
Search
ドキュメントセンター

MaxCompute:シナリオの実践

最終更新日:Jan 08, 2025

MaxComputeは、ユーザー定義関数 (UDF) と、PyODPSやMaxFrameなどのPython開発機能を提供します。 このトピックでは、MaxCompute UDF、PyODPS、およびMaxFrame開発ジョブ内でイメージを使用する方法について説明します。

SQL UDF開発でのイメージの使用

次の例では、列を合計するUDFでPandasを使用します。

  1. Python UDFスクリプトを記述し、sum_pandas.pyとして保存します。 次のサンプルスクリプトを以下に示します。

    from odps.udf import annotate
    import pandas as pd
    
    @annotate("string, string -> string")
    class SumColumns(object):
        def evaluate(self, arg1, arg2):
            # Convert input parameters to pandas DataFrame
            df = pd.DataFrame({'col1': arg1.split(','), 'col2': arg2.split(',')})
    
            # Perform data processing operations by using pandas
            # Calculate the sum of two columns as an example
            df['sum'] = df['col1'].astype(int) + df['col2'].astype(int)
    
            # Convert the processing result to a string and return
            result = ','.join(df['sum'].astype(str).values)
            return result
  2. sum_pandas.pyスクリプトをリソースとしてMaxComputeプロジェクトにアップロードします。 詳細については、「リソースの追加」をご参照ください。 次のコマンドを使用します。

    ADD PY sum_pandas.py -f;
  3. sum_pandas.pyスクリプトをSumColumns UDFとして登録します。 詳細については、「UDFの作成」をご参照ください。 次のコマンドを使用します。

    CREATE FUNCTION SumColumns AS 'sum_pandas.SumColumns' USING 'sum_pandas.py';
  4. 対応するテストデータでテストテーブルtestsumを設定します。

    CREATE TABLE testsum (col1 string, col2 string);
    INSERT INTO testsum VALUES ('1,2,3','1,2,3'),('1,2,3','3,2,1'),('1,2,3','4,5,6');
  5. UDFを呼び出すときにFlagパラメーターを使用してイメージを指定します。

    set odps.sql.python.version=cp37;
    set odps.session.image = ;
    SELECT SumColumns(col1,col2) AS result FROM testsum;

    期待される結果:

    +------------+
    | result     |
    +------------+
    | 2,4,6      |
    | 4,4,4      |
    | 5,7,9      |
    +------------+

PyODPS開発でのイメージの使用

次の例では、scipyパッケージからpsi関数を実装します。

  1. テストテーブルtest_float_colを準備し、テストデータを挿入します。

    CREATE TABLE test_float_col (col1 double);
    INSERT INTO test_float_col VALUES (3.75),(2.51);
  2. psi(col1) を計算するPyODPSコードを記述し、psi_col.pyとして保存します。 以下のサンプルコードを示します。

    import os
    from odps import ODPS, options
    
    def my_psi(v):
        from scipy.special import psi
    
        return float(psi(v))
    
    # If the project enables isolation, the following option is not required
    options.sql.settings = {"odps.isolation.session.enable": True}
    
    o = ODPS(
          # Ensure that the ALIBABA_CLOUD_ACCESS_KEY_ID environment variable is set to your AccessKey ID,
          # and the ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variable is set to your AccessKey secret.
          # It is not recommended to directly use the AccessKey ID and AccessKey secret strings.
          os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
          os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
          project='your-default-project',
          endpoint='your-end-point'
    )
    
    df = o.get_table("test_float_col").to_df()
    # Execute directly and get the result
    df.col1.map(my_psi).execute(image='scipy')
    # Save to another table
    df.col1.map(my_psi).persist("result_table", image='scipy')

    パラメータの説明:

    • ALIBABA_CLOUD_ACCESS_KEY_ID: この環境変数を、MaxComputeプロジェクトに必要なMaxCompute権限を持つAccessKey IDに設定します。 AccessKey IDは、AccessKey管理ページから取得できます。

    • ALIBABA_CLOUD_ACCESS_KEY_SECRET: この環境変数を、AccessKey IDに対応するAccessKeyシークレットに設定します。

    • your-default-project: MaxComputeプロジェクトの名前。 プロジェクト名を表示するには、MaxComputeコンソールにログインし、左側のナビゲーションウィンドウで [ワークスペース] > [プロジェクト] を選択します。

    • your-end-point: MaxComputeプロジェクトが存在するリージョンのエンドポイントです。 ネットワーク接続方法に基づいてエンドポイントを選択できます。 たとえば、http://service.cn-chengdu.maxcompute.aliyun.com/api 。 詳細については、「エンドポイント」をご参照ください。

  3. 結果をresult_tableテーブルに表示します。

    SELECT * FROM result_table

    期待される結果:

    +----------------------+
    | col1                 |
    +----------------------+
    | 1.1825373886117962   |
    | 0.7080484451910534   |
    +----------------------+

MaxFrame開発でのイメージの使用

次の例では、scipyパッケージからpsi関数を実装します。

  1. テストテーブルtest_float_colを作成し、テストデータを挿入します。

    CREATE TABLE test_float_col (col1 double);
    INSERT INTO test_float_col VALUES (3.75),(2.51);
  2. psi(col1) を計算するためのMaxFrameコードを記述し、psi_col.pyとして保存します。 以下のサンプルコードを示します。

    import os
    from odps import ODPS, options
    from maxframe.session import new_session
    import maxframe.dataframe as md
    
    from maxframe.config import options
    from maxframe import config
    
    # Use the built-in scipy image
    config.options.sql.settings = {
        "odps.session.image": "scipy"
    }
    def my_psi(v):
        from scipy.special import psi
        return float(psi(v))
    
    o = ODPS(
          # Ensure that the ALIBABA_CLOUD_ACCESS_KEY_ID environment variable is set to your AccessKey ID,
          # and the ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variable is set to your AccessKey secret.
          # It is not recommended to directly use the AccessKey ID and AccessKey secret strings.
          os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
          os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
          project='your-default-project',
          endpoint='your-end-point'
    )
    
    # Create a MaxFrame session
    session = new_session(o)
    df = md.read_odps_table('test_float_col')
    
    # Execute and get the result
    print(df.col1.map(my_psi).execute().fetch()

    パラメータの説明:

    • ALIBABA_CLOUD_ACCESS_KEY_ID: この環境変数を、MaxComputeプロジェクトに必要なMaxCompute権限を持つAccessKey IDに設定します。 AccessKey IDは、AccessKey管理ページから取得できます。

    • ALIBABA_CLOUD_ACCESS_KEY_SECRET: この環境変数を、AccessKey IDに対応するAccessKeyシークレットに設定します。

    • your-default-project: MaxComputeプロジェクトの名前。 プロジェクト名を表示するには、MaxComputeコンソールにログインし、左側のナビゲーションウィンドウで [ワークスペース] > [プロジェクト] を選択します。

    • your-end-point: MaxComputeプロジェクトが存在するリージョンのエンドポイントです。 ネットワーク接続方法に基づいてエンドポイントを選択できます。 たとえば、http://service.cn-chengdu.maxcompute.aliyun.com/api 。 詳細については、「エンドポイント」をご参照ください。

    期待される結果:

    0    1.182537
    1    0.708048
    Name: col1, dtype: float64