すべてのプロダクト
Search
ドキュメントセンター

MaxCompute:ローカル環境で PyODPS を使用する

最終更新日:Dec 03, 2025

PyODPS は、開発者が MaxCompute を使用してビッグデータ処理と分析を行うための Alibaba Cloud の Python SDK です。このトピックでは、ローカル環境で PyODPS を使用して、テーブル操作、データロード、SQL クエリの実行を行う方法について説明します。

前提条件

  • PyODPS がローカルにインストールされており、環境変数が設定されていること。

  • このドキュメントの例では、pyodps_iris テーブルを使用した基本操作をデモンストレーションします。このテーブルを準備するには、「使用例」をご参照のうえ、データセットをダウンロードし、pyodps_iris テーブルを作成してデータを書き込んでください。

手順

  1. Python エディターを開き、Python ファイルを作成します。

    説明

    Python エディターがローカルにインストールされていない場合は、.py 拡張子のファイルを作成できます。

  2. PyODPS タスクコードを開発します。

    ファイルを作成した後、以下のセクションを参照して、PyODPS の主な機能を示す簡単な例を確認できます。

    PyODPS の使用に関する詳細については、「基本操作の概要」および「DataFrame (非推奨)」をご参照ください。また、「PyODPS ノードを使用した Jieba 中国語単語分割」トピックで、簡単なエンドツーエンドの操作例を参照することもできます。

  3. Python ファイルを保存し、ローカルで実行します。

ODPS エントリポイントの初期化

重要

ODPS エントリポイントを手動で定義します。次のコードに例を示します。

import os
from odps import ODPS

o = ODPS(
    # (推奨) 環境変数が設定されていることを確認します。
    # ALIBABA_CLOUD_ACCESS_KEY_ID 環境変数がご自身の AccessKey ID に設定されていることを確認します。
    access_id=os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
    # ALIBABA_CLOUD_ACCESS_KEY_SECRET 環境変数がご自身の AccessKey Secret に設定されていることを確認します。
    secret_access_key=os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
    
    # (非推奨) 環境変数が設定されていない場合は、代わりに以下のフォーマットを使用します。
    # access_id='your-aliyun-access-key-id',
    # secret_access_key= 'your-aliyun-access-key-secret',
    
    project='your-default-project',
    endpoint='your-end-point',
)

パラメーター

説明

your-default-project

ご利用の MaxCompute プロジェクト名を入力します。MaxCompute コンソールにログインして、プロジェクトのリストを取得します。

your-end-point

ご利用のリージョンとネットワーク環境に対応するエンドポイントを入力します。

たとえば、中国 (杭州) リージョンのパブリックエンドポイントの場合、https://service.cn-hangzhou.maxcompute.aliyun.com/api を入力します。

重要

正しいネットワークタイプを選択していることを確認してください。そうでない場合、接続は失敗します。

これらの構成を完了すると、ローカル環境で PyODPS を使用して ODPS オブジェクトに対する基本操作を実行できます。これらの操作には、listgetexistcreatedelete が含まれます。PyODPS の使用に関する詳細については、「基本操作の概要」および「DataFrame (非推奨)」をご参照ください。

SQL の実行

  1. PyODPS ノードで SQL コマンドを実行します。

    PyODPS ノードでは、従来モードまたは MaxCompute クエリアクセラレーション (MCQA) を使用して SQL コマンドを実行できます。この機能は現在、データ定義言語 (DDL) およびデータ操作言語 (DML) コマンドをサポートしています。高速化クエリモード (MCQA) は、ジョブの結果を一時的なキャッシュに書き込みます。同じクエリジョブを再度実行すると、MaxCompute はキャッシュされた結果を返し、実行を高速化します。課金ルールに関する詳細については、「」または「コンピューティング費用 (従量課金)」をご参照ください。要件に応じて実行モードを選択できます。

    説明

    execute_sql()run_sql() などのメソッドをエントリポイントオブジェクトで使用して、すべてのタイプの SQL 文を実行することはできません。

    非 DDL または非 DML 文を呼び出すには、他のメソッドを使用する必要があります。たとえば、`CREATE TABLE` 文を呼び出すには create_table メソッドを使用し、API コマンドを呼び出すには run_xflow または execute_xflow メソッドを使用します。

    従来モードを使用した SQL コマンドの実行

    execute_sql()/run_sql() を使用して SQL コマンドを実行できます。以下のコードは一例です:

    create_table メソッドを使用して新しいテーブルを作成します。

    o.create_table('my_t', 'num bigint, id string', if_not_exists=True)

    execute_sql メソッドを使用して SQL クエリを実行します。

    result = o.execute_sql('SELECT * FROM pyodps_iris LIMIT 3')
    with result.open_reader() as reader:
        for record in reader:
            print(record)
    
  2. PyODPS ノードで SQL 実行結果を読み取ります。

    open_reader() を使用して SQL コマンドの結果を読み取ります。詳細については、「SQL 実行結果の読み取り」をご参照ください。

PyODPS ノードでの SQL 関連操作に関する詳細については、「SQL」をご参照ください。

DataFrame

PyODPS は、データ処理に使用できる DataFrame API を提供します。DataFrame 操作のその他の例については、「DataFrame」をご参照ください。

  • 実行

    DataFrame を実行するには、即時実行メソッド (たとえば executepersist) を明示的に呼び出す必要があります。次のコードは、その例です。

    # 即時実行メソッドを呼び出して各レコードを処理し、pyodps_iris テーブル内の iris.sepalwidth が 3 未満のすべてのデータを出力します。
    from odps.df import DataFrame
    iris = DataFrame(o.get_table('pyodps_iris'))
    for record in iris[iris.sepalwidth < 3].execute():  
      print(record)
    
  • 詳細情報の出力

    デフォルトでは、ローカル環境の PyODPS ノードの実行プロセスは、Logview などの詳細情報を出力しません。Logview などの詳細情報を出力するには、options.verbose オプションを手動で設定する必要があります。

    from odps import options
    options.verbose = True
    

実行時パラメーター (ヒント) の設定

タスクの実行時パラメーターを設定するには、辞書である hints パラメーターを使用します。

o.execute_sql('SELECT * FROM pyodps_iris', hints={'odps.sql.mapper.split.size': 16}) # hints パラメーターを使用してランタイム パラメーターを設定

グローバルな SQL 設定を構成することもできます。構成が完了すると、関連する実行時パラメーターが各実行に追加されます。

from odps import options
options.sql.settings = {'odps.sql.mapper.split.size': 16}
# グローバル構成に基づいてヒントが追加されます。
o.execute_sql('SELECT * FROM pyodps_iris')

完全な例

  1. ローカルマシンに test-pyodps-local.py という名前のファイルを作成します。

  2. サンプルコードを記述します。

    import os
    from odps import ODPS
    
    o = ODPS(
        # (推奨) 環境変数が設定されていることを確認します。
        # ALIBABA_CLOUD_ACCESS_KEY_ID 環境変数がご自身の AccessKey ID に設定されていることを確認します。
        access_id=os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
        # ALIBABA_CLOUD_ACCESS_KEY_SECRET 環境変数がご自身の AccessKey Secret に設定されていることを確認します。
        secret_access_key=os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
        
        project='your-default-project',
        endpoint='your-end-point',
    )
    
    # フィールド名と型を直接指定して、my_new_table という名前の非パーティションテーブルを作成します。
    table = o.create_table('my_new_table', 'num bigint, id string', if_not_exists=True)
    
    # 非パーティションテーブル my_new_table にデータを挿入します。
    records = [[111, 'aaa'],
               [222, 'bbb'],
               [333, 'ccc'],
               [444, 'Chinese']]
    o.write_table(table, records)
    
    # 非パーティションテーブル my_new_table からデータを読み取ります。
    for record in o.read_table(table):
        print(record[0], record[1])
    
    # SQL 文を実行してテーブルからデータを読み取ります。
    result = o.execute_sql('SELECT * FROM pyodps_iris LIMIT 3;', hints={'odps.sql.allow.fullscan': 'true'})
    
    # リソースを解放するためにテーブルをドロップします。
    table.drop()
    
    print('Read data from the pyodps_iris table using open_reader:')
    
    # SQL 実行結果を読み取ります。
    with result.open_reader() as reader:
        for record in reader:
            print(record[0], record[1])
    
  3. Python コードを実行します。

    python test-pyodps-local.py

    出力は次のとおりです:

    111 aaa
    222 bbb
    333 ccc
    444 Chinese
    open_reader を使用して pyodps_iris テーブルからデータを読み取ります:
    4.9 3.0
    4.7 3.2
    4.6 3.1