すべてのプロダクト
Search
ドキュメントセンター

MaxCompute:DataWorks での PyODPS の使用

最終更新日:Apr 08, 2025

PyODPS は、Python 用の MaxCompute SDK です。DataWorks コンソールで PyODPS ノードを作成して実行できます。このトピックでは、DataWorks で PyODPS を使用する場合の制限事項と、PyODPS ノードの作成方法について説明します。また、PyODPS の使用方法の例も示します。

制限事項

  • 使用制限

    メモリ使用量が上限を超えると、システムは [強制終了] を報告し、関連するプロセスを終了します。PyODPS ノードからデータをダウンロードして DataWorks で処理するのではなく、データ処理タスクを MaxCompute にコミットして分散実行することをお勧めします。2 つの方法の比較については、「概要」の「注意事項」セクションをご参照ください。

  • パッケージの制限

    • matplotlib などのパッケージがないため、PyODPS ノードの機能は次の点で制限される可能性があります。

      • DataFrame の plot 関数の使用に影響します。

      • DataFrame ユーザー定義関数(UDF)は、DataFrame UDF が MaxCompute にコミットされた後にのみ使用できます。Python サンドボックスの要件に基づいて UDF を実行するには、純粋な Python ライブラリと NumPy ライブラリのみを使用できます。pandas などの他のサードパーティライブラリは使用できません。

      • ただし、DataWorks にプリインストールされている NumPy ライブラリと pandas ライブラリを使用して、非 UDF を実行することはできます。バイナリコードを含むサードパーティパッケージはサポートされていません。

    • DataWorks の PyODPS ノードは、Python パッケージ atexit をサポートしていません。try-finally を使用して関連機能を有効にすることができます。

  • 読み取ることができるデータレコード数の制限

    デフォルトでは、DataWorks の PyODPS ノードでは、options.tunnel.use_instance_tunnel パラメーターは False に設定されています。これは、最大 10,000 件のデータレコードを読み取ることができることを示します。さらに多くのデータレコードを読み取る場合は、options.tunnel.use_instance_tunnel パラメーターを True に設定して、InstanceTunnel をグローバルに有効にする必要があります。

手順

  1. PyODPS ノードを作成します。

    DataWorks の [DataStudio] ページに移動して、PyODPS ノードを作成します。PyODPS ノードは、PyODPS 2 ノードと PyODPS 3 ノードに分類されます。

    • PyODPS 2 の基盤となる Python バージョンは Python 2 です。

    • PyODPS 3 の基盤となる Python バージョンは Python 3 です。

    使用する Python のバージョンに基づいて PyODPS ノードを作成できます。PyODPS ノードの作成方法の詳細については、「PyODPS 2 タスクを開発する」および「PyODPS 3 タスクを開発する」をご参照ください。新建节点

  2. PyODPS ノードのコードを開発します。

    PyODPS ノードを作成したら、以下のセクションの例を参照して、PyODPS の主な機能に関する情報を取得します。

    PyODPS の使用方法の詳細については、「基本操作の概要」および「DataFrame の概要」をご参照ください。PyODPS ノードで単純なエンドツーエンド操作を実行する方法の詳細については、「PyODPS ノードを使用して Jieba に基づいて中国語テキストをセグメント化する」をご参照ください。

  3. スケジューリングパラメーターを構成します。次に、ノードを保存、コミット、およびデプロイします。こうすることで、ノードは定期的に実行できます。

MaxCompute エントリポイント

DataWorks の各 PyODPS ノードには、MaxCompute エントリポイントであるグローバル変数 odps または o が含まれています。MaxCompute エントリポイントを指定する必要はありません。ステートメントの例:

# pyodps_iris テーブルが存在するかどうかを確認します。
print(o.exist_table('pyodps_iris'))

True が返された場合、pyodps_iris テーブルは存在します。

説明

エントリオブジェクト o で使用される認証情報は、MaxCompute へのアクセスのみをサポートし、他のクラウドサービスへのアクセスには使用できません。PyODPS ノードの実行中、DataWorks はこの認証情報のみを提供します。o.from_global メソッドなど、MaxCompute エントリオブジェクトのメソッドを使用して追加の認証情報を取得することはできません

SQL 文の実行

一般的な機能

  • PyODPS ノードで SQL 文を実行します。たとえば、execute_sql()/run_sql() を使用して SQL 文を実行できます。データ定義言語(DDL)およびデータ操作言語(DML)の SQL 文のみを実行できます。

    説明

    MaxCompute クライアントで実行できる特定の SQL 文は、MaxCompute エントリオブジェクトの execute_sql() メソッドと run_sql() メソッドを使用して実行できない場合があります。非 DDL 文または非 DML 文を実行するには、他のメソッドを使用する必要があります。たとえば、GRANT 文または REVOKE 文を実行するには run_security_query メソッドを使用し、API 操作を呼び出すには run_xflow メソッドまたは execute_xflow メソッドを使用する必要があります。

  • PyODPS ノードから SQL 実行結果を読み取ります。たとえば、open_reader() メソッドを使用して SQL 実行結果を読み取ることができます。

PyODPS ノードでの SQL 関連操作の詳細については、「SQL」をご参照ください。

データ形式とレコード数の制限

歴史的な互換性の理由から、DataWorks ではデフォルトで インスタンストンネル は有効になっていません。この場合、Result インターフェイスが呼び出されて instance.open_reader が実行され、最大 10,000 件のデータレコードを読み取ることができ、複雑なデータ型のサポートにも問題があります。プロジェクトで データ保護メカニズムが有効になっておらず、すべてのデータを反復的にフェッチする必要がある場合、または Arrays などの複雑なデータ型フィールドを読み取る必要がある場合は、インスタンストンネル を有効にし、limit を無効にする必要があります。

  • コードの実行中に limit を無効にします。

    次の文を実行して、インスタンストンネルをグローバルに有効にし、limit を削除できます。

    options.tunnel.use_instance_tunnel = True
    options.tunnel.limit_instance_tunnel = False # 読み取ることができるデータレコード数の制限を削除します。
    
    with instance.open_reader() as reader:
    # インスタンストンネルを使用してすべてのデータを読み取ります。
    # reader.count を使用してデータレコード数を取得できます。

  • 現在のリーダーに対してのみ limit を無効にします。

    tunnel=Trueopen_reader() メソッドに追加して、現在の InstanceTunnelopen_reader() メソッドで有効にすることもできます。limit=False を open_reader() メソッドに追加して、現在の open_reader() メソッドで読み取ることができるデータレコード数の limit を削除することもできます。

    with instance.open_reader(tunnel=True, limit=False) as reader:
    # 現在の open_reader() メソッドはインスタンストンネルを使用して呼び出され、すべてのデータを読み取ることができます。

インスタンストンネルとデータ読み取りの制限の詳細については、「SQL 文の実行結果を取得する」をご参照ください。

DataFrame

  • 実行方法

    DataWorks で DataFrame に対して操作を実行するには、即時実行メソッドexecutepersist など、すぐに実行されるメソッド を明示的に呼び出す必要があります。次のコードは例を示しています。

    # すぐ実行されるメソッドを呼び出して各データレコードを処理し、pyodps_iris テーブルで iris.sepalwidth が 3 未満のすべてのデータレコードを表示します。
    from odps.df import DataFrame
    iris = DataFrame(o.get_table('pyodps_iris'))
    for record in iris[iris.sepalwidth < 3].execute():
        print(record)
  • 詳細の表示

    デフォルトでは、DataWorks で options.verbose が有効になっています。これは、Logview などの DataWorks 内の PyODPS ノードの 詳細が表示されることを示します。このオプションを構成して、Logview などの詳細を表示するかどうかを指定できます。

DataFrame 操作の詳細については、「DataFrame(非推奨)」をご参照ください。

スケジューリングパラメーターの取得

DataWorks で PyODPS ノードを使用してコードを開発する場合、スケジューリングパラメーターも使用できます。たとえば、スケジューリングパラメーターを使用して、ノードのデータタイムスタンプを取得できます。DataWorks の PyODPS ノードと SQL ノードは、同じ方法でスケジューリングパラメーターを指定します。ただし、スケジューリングパラメーターは、PyODPS ノードと SQL ノードのコードでは異なる方法で参照されます。

  • ${param_name} などの文字列は、SQL ノードのコードで使用されます。

  • PyODPS ノードのコードが実行される前に、args という名前の辞書がグローバル変数に追加されます。これにより、コードへの悪影響を防ぎます。コードは、コードで ${param_name} をスケジューリングパラメーターに置き換える代わりに、args[param_name] メソッドを使用してスケジューリングパラメーターの値を取得します。

たとえば、DataWorks の PyODPS ノードの [スケジューリング構成] タブで、ds=${yyyymmdd}[基本プロパティ] セクションの [パラメーター] フィールドにスケジューリングパラメーター を指定できます。次に、ノードのコードで次のコマンドを指定して、パラメーター値を取得できます。

  • ds パラメーターの値を取得します。

    print('ds=' + args['ds'])
    # ds パラメーターで指定された時刻を取得します(例: ds=20161116)。
  • ds=${yyyymmdd} で指定されたパーティションのデータを取得します。

    o.get_table('table_name').get_partition('ds=' + args['ds'])
    # table_name で指定されたテーブルの ds で指定されたパーティションからデータを取得します。

スケジューリングパラメーターの詳細については、「スケジューリングパラメーターを構成して使用する」をご参照ください。

hints パラメーターの構成

hints パラメーターを使用して、ランタイムパラメーターを構成できます。hints パラメーターの値は DICT 型です。

o.execute_sql('select * from pyodps_iris', hints={'odps.sql.mapper.split.size': 16})

sql.settings パラメーターをグローバルに構成できます。関連するランタイムパラメーターは、実行ごとに自動的に追加されます。

from odps import options
options.sql.settings = {'odps.sql.mapper.split.size': 16}
o.execute_sql('select * from pyodps_iris') # hints パラメーターはグローバル設定に基づいて自動的に構成されます。

サードパーティパッケージの使用

次の表に、DataWorks ノードにプリインストールされているサードパーティパッケージと、ノードのパッケージのバージョンを示します。

パッケージ名

Python 2 ノードのパッケージバージョン

Python 3 ノードのパッケージバージョン

requests

2.11.1

2.26.0

numpy

1.16.6

1.18.1

pandas

0.24.2

1.0.5

scipy

0.19.0

1.3.0

scikit_learn

0.18.1

0.22.1

pyarrow

0.16.0

2.0.0

lz4

2.1.4

3.1.10

zstandard

0.14.1

0.17.0

使用したいサードパーティパッケージが上記の表に含まれていない場合は、DataWorks が提供する load_resource_package メソッドを使用して、MaxCompute リソースからパッケージをダウンロードできます。pyodps-pack を使用してパッケージを生成した後、load_resource_package メソッドを使用してサードパーティパッケージを読み込むことができます。その後、パッケージの内容を DataWorks にインポートできます。pyodps-pack の使用方法の詳細については、「PyODPS 用のサードパーティパッケージを生成する」および「PyODPS ノードでサードパーティパッケージを参照する」をご参照ください。

説明

Python 2 ノードのパッケージを生成する場合は、パッケージの生成時に --dwpy27 パラメーターを pyodps-pack に追加します。

例:

  1. 次のコマンドを実行して、IP アドレスをパッケージ化します。

    pyodps-pack -o ipaddress-bundle.tar.gz ipaddress
  2. ipaddress-bundle.tar.gz パッケージをリソースとしてアップロードして送信した後、次の方法で PyODPS 3 ノードのパッケージを使用できます。

    load_resource_package("ipaddress-bundle.tar.gz")
    import ipaddress

ダウンロードしたパッケージの合計サイズは 100 MB を超えることはできません。プリインストールされたパッケージのパッケージ化操作をスキップする場合は、パッケージ化中に pyodps-pack が提供する --exclude パラメーターを使用できます。たとえば、次のパッケージ化方法は、DataWorks 環境に存在する NumPy パッケージと pandas パッケージを除外します。

pyodps-pack -o bundle.tar.gz --exclude numpy --exclude pandas <YOUR_PACKAGE>

別のアカウントを使用して MaxCompute にアクセスする

特定のシナリオでは、別の Alibaba Cloud アカウントを使用して、現在の Alibaba Cloud アカウント内の MaxCompute プロジェクトにアクセスしたい場合があります。この場合、ODPS エントリオブジェクトの as_account メソッドを使用して、新しいアカウントを使用するエントリオブジェクトを作成できます。デフォルトでは、新しいエントリオブジェクトは、システムが提供する o インスタンスとは無関係です。

重要

as_account メソッドは PyODPS 0.11.3 以降でサポートされています。DataWorks ノードに PyODPS 0.11.3 以降を構成していない場合、as_account メソッドを使用することはできません。

手順

  1. 新しいアカウントにプロジェクトに対する権限を付与します。詳細については、「付録: 別のアカウントに権限を付与する」をご参照ください。

  2. as_account メソッドを使用して新しいアカウントに切り替え、PyODPS ノードでアカウントのエントリオブジェクトを作成します。

    import os
    # 環境変数 ALIBABA_CLOUD_ACCESS_KEY_ID を Alibaba Cloud アカウントの AccessKey ID に設定します。
    # 環境変数 ALIBABA_CLOUD_ACCESS_KEY_SECRET を Alibaba Cloud アカウントの AccessKey シークレットに設定します。
    # AccessKey ID または AccessKey シークレットを直接使用しないことをお勧めします。
    new_odps = o.as_account(
        os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
        os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET')
    )
  3. アカウントが切り替えられているかどうかを確認します。

    現在のユーザーに関する情報をクエリします。実行するコードに次のステートメントを追加します。返された結果のユーザー情報が新しいアカウントの ID である場合、新しいアカウントを使用して MaxCompute にアクセスしています。

    print(new_odps.get_project().current_user)
    説明

    new_odps: 新しいアカウントのエントリオブジェクト。

  1. テーブルを作成し、テーブルにデータをインポートします。

    1. iris.datairis からデータセット iris.csv をダウンロードし、iris.data を に名前変更します。

    2. pyodps_iris という名前のテーブルを作成し、データセット iris.csv をテーブルにアップロードします。詳細については、「テーブルを作成してデータをアップロードする」をご参照ください。

      ステートメントの例:

      CREATE TABLE if not exists pyodps_iris
      (
      sepallength  DOUBLE comment 'がく片の長さ (cm)',
      sepalwidth   DOUBLE comment 'がく片の幅 (cm)',
      petallength  DOUBLE comment '花びらの長さ (cm)',
      petalwidth   DOUBLE comment '花びらの幅 (cm)',
      name         STRING comment 'タイプ'
      );
  2. ODPS SQL ノードを作成し、ノードに対する権限を新しいアカウントに付与します。詳細については、「付録: 別のアカウントに権限を付与する」をご参照ください。

  3. PyODPS ノードを作成し、新しいアカウントに切り替えます。詳細については、「PyODPS 3 タスクを開発する」をご参照ください。

    from odps import ODPS
    import os
    import sys
    
    # 環境変数 ALIBABA_CLOUD_ACCESS_KEY_ID を Alibaba Cloud アカウントの AccessKey ID に設定します。
    # 環境変数 ALIBABA_CLOUD_ACCESS_KEY_SECRET を Alibaba Cloud アカウントの AccessKey シークレットに設定します。
    # AccessKey ID または AccessKey シークレットを直接使用しないことをお勧めします。
    os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'] = '<accesskey id>'
    os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET'] = '<accesskey secret>'
    od = o.as_account(os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
             os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET')
            )
    iris = DataFrame(od.get_table('pyodps_iris'))
    # WHERE 句に基づいて結果を返します。
    with od.execute_sql('select * from pyodps_iris WHERE sepallength > 5 ').open_reader() as reader4:
        print(reader4.raw)
        for record in reader4:
            print(record["sepallength"],record["sepalwidth"],record["petallength"],record["petalwidth"],record["name"])
    # 現在のユーザーの ID を返します。
    print(od.get_project().current_user)
  4. コードを実行して結果を表示します。

    PyODPS 0.11.4.post0 を使用してユーザースクリプトを実行しています。
    
    "sepallength","sepalwidth","petallength","petalwidth","name"
    /* 中略 */
    <User 139xxxxxxxxxxxxx>

診断

コードの実行中に応答が返されず、出力が返されない場合は、コード ヘッダーに次のコメントを追加できます。DataWorks は 30 秒ごとにすべてのスレッドのスタックを返します。

# -*- dump_traceback: true -*-  // スタックトレースを出力する
説明

このメソッドは、バージョン 0.11.4.1 以降の PyODPS 3 ノードに適しています。

表示 PyODPS バージョン

Python コードを実行して PyODPS バージョンをクエリできます。また、PyODPS ノードの実行時ログで PyODPS バージョンを表示することもできます。

  • PyODPS ノードでコードを実行して、PyODPS バージョンをクエリします。

    # 次のコードを実行します。
    import odps; print(odps.__version__)
    
    # 次の結果が返されます。
    0.11.2.3
  • PyODPS ノードの実行時ログで PyODPS バージョンを表示します。次の図は例を示しています。image.png

付録: 別のアカウントに権限を付与する

DataWorks で現在の Alibaba Cloud アカウント内のプロジェクトとテーブルに別の Alibaba Cloud アカウントを使用してアクセスする場合、DataWorks で ODPS SQL ノードを作成し、次のコマンドを実行して必要な権限をアカウントに付与する必要があります。ODPS SQL ノードの作成方法の詳細については、「ODPS SQL ノードを作成する」をご参照ください。権限の詳細については、「ユーザーと権限」をご参照ください。

-- 使用する Alibaba Cloud アカウントをプロジェクトに追加します。
add user ALIYUN$<account_name>;

-- プロジェクトに対する CreateInstance 権限をアカウントに付与します。
grant CreateInstance on project <project_name> to USER ALIYUN$<account_name>;

-- テーブルに対する Describe 権限と Select 権限をアカウントに付与します。
grant Describe, Select on table <table_name> to USER ALIYUN$<account_name>;

-- 承認結果を表示します。
show grants for ALIYUN$<account_name>;

次の図は、結果の例を示しています。image.png

付録: サンプルデータ

DataWorks コンソールで pyodps_iris という名前のテーブルを作成し、テーブルにデータをインポートできます。テーブルを作成してデータをインポートする方法の詳細については、「PyODPS ノードを使用して特定の条件に基づいてデータをクエリする」のステップ 1 をご参照ください。サンプルでは pyodps_iris テーブルを使用します。

参照資料

DataWorks データ開発インターフェイスで、過去 3 日間に実行したすべてのタスク レコード を表示し、実行中のタスクを停止し、[実行時ログ] から SQL 文を一時ファイルとして保存できます。詳細については、「操作履歴を表示する」をご参照ください。