PyODPS は、開発者が MaxCompute を使用してビッグデータ処理と分析を行うための Alibaba Cloud の Python SDK です。このトピックでは、ローカル環境で PyODPS を使用して、テーブル操作、データロード、SQL クエリの実行を行う方法について説明します。
前提条件
手順
Python エディターを開き、Python ファイルを作成します。
説明Python エディターがローカルにインストールされていない場合は、
.py拡張子のファイルを作成できます。PyODPS タスクコードを開発します。
ファイルを作成した後、以下のセクションを参照して、PyODPS の主な機能を示す簡単な例を確認できます。
PyODPS の使用に関する詳細については、「基本操作の概要」および「DataFrame (非推奨)」をご参照ください。また、「PyODPS ノードを使用した Jieba 中国語単語分割」トピックで、簡単なエンドツーエンドの操作例を参照することもできます。
Python ファイルを保存し、ローカルで実行します。
ODPS エントリポイントの初期化
ALIBABA_CLOUD_ACCESS_KEY_ID と ALIBABA_CLOUD_ACCESS_KEY_SECRET の環境変数が設定されていることを確認してください。
(非推奨) 環境変数が設定されていない場合は、キーを明示的に指定できます。そのためには、access_id と secret_access_key を取得してください。
ODPS エントリポイントを手動で定義します。次のコードに例を示します。
import os
from odps import ODPS
o = ODPS(
# (推奨) 環境変数が設定されていることを確認します。
# ALIBABA_CLOUD_ACCESS_KEY_ID 環境変数がご自身の AccessKey ID に設定されていることを確認します。
access_id=os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
# ALIBABA_CLOUD_ACCESS_KEY_SECRET 環境変数がご自身の AccessKey Secret に設定されていることを確認します。
secret_access_key=os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
# (非推奨) 環境変数が設定されていない場合は、代わりに以下のフォーマットを使用します。
# access_id='your-aliyun-access-key-id',
# secret_access_key= 'your-aliyun-access-key-secret',
project='your-default-project',
endpoint='your-end-point',
)パラメーター | 説明 |
your-default-project | ご利用の MaxCompute プロジェクト名を入力します。MaxCompute コンソールにログインして、プロジェクトのリストを取得します。 |
your-end-point | ご利用のリージョンとネットワーク環境に対応するエンドポイントを入力します。 たとえば、中国 (杭州) リージョンのパブリックエンドポイントの場合、 重要 正しいネットワークタイプを選択していることを確認してください。そうでない場合、接続は失敗します。 |
これらの構成を完了すると、ローカル環境で PyODPS を使用して ODPS オブジェクトに対する基本操作を実行できます。これらの操作には、list、get、exist、create、delete が含まれます。PyODPS の使用に関する詳細については、「基本操作の概要」および「DataFrame (非推奨)」をご参照ください。
SQL の実行
PyODPS ノードで SQL コマンドを実行します。
PyODPS ノードでは、従来モードまたは MaxCompute クエリアクセラレーション (MCQA) を使用して SQL コマンドを実行できます。この機能は現在、データ定義言語 (DDL) およびデータ操作言語 (DML) コマンドをサポートしています。高速化クエリモード (MCQA) は、ジョブの結果を一時的なキャッシュに書き込みます。同じクエリジョブを再度実行すると、MaxCompute はキャッシュされた結果を返し、実行を高速化します。課金ルールに関する詳細については、「」または「コンピューティング費用 (従量課金)」をご参照ください。要件に応じて実行モードを選択できます。
説明execute_sql()やrun_sql()などのメソッドをエントリポイントオブジェクトで使用して、すべてのタイプの SQL 文を実行することはできません。非 DDL または非 DML 文を呼び出すには、他のメソッドを使用する必要があります。たとえば、`CREATE TABLE` 文を呼び出すには
create_tableメソッドを使用し、API コマンドを呼び出すにはrun_xflowまたはexecute_xflowメソッドを使用します。従来モードを使用した SQL コマンドの実行
execute_sql()/run_sql() を使用して SQL コマンドを実行できます。以下のコードは一例です:
create_tableメソッドを使用して新しいテーブルを作成します。o.create_table('my_t', 'num bigint, id string', if_not_exists=True)execute_sqlメソッドを使用して SQL クエリを実行します。result = o.execute_sql('SELECT * FROM pyodps_iris LIMIT 3') with result.open_reader() as reader: for record in reader: print(record)PyODPS ノードで SQL 実行結果を読み取ります。
open_reader() を使用して SQL コマンドの結果を読み取ります。詳細については、「SQL 実行結果の読み取り」をご参照ください。
PyODPS ノードでの SQL 関連操作に関する詳細については、「SQL」をご参照ください。
DataFrame
PyODPS は、データ処理に使用できる DataFrame API を提供します。DataFrame 操作のその他の例については、「DataFrame」をご参照ください。
実行
DataFrame を実行するには、即時実行メソッド (たとえば
executeやpersist) を明示的に呼び出す必要があります。次のコードは、その例です。# 即時実行メソッドを呼び出して各レコードを処理し、pyodps_iris テーブル内の iris.sepalwidth が 3 未満のすべてのデータを出力します。 from odps.df import DataFrame iris = DataFrame(o.get_table('pyodps_iris')) for record in iris[iris.sepalwidth < 3].execute(): print(record)詳細情報の出力
デフォルトでは、ローカル環境の PyODPS ノードの実行プロセスは、Logview などの詳細情報を出力しません。Logview などの詳細情報を出力するには、
options.verboseオプションを手動で設定する必要があります。from odps import options options.verbose = True
実行時パラメーター (ヒント) の設定
タスクの実行時パラメーターを設定するには、辞書である hints パラメーターを使用します。
o.execute_sql('SELECT * FROM pyodps_iris', hints={'odps.sql.mapper.split.size': 16}) # hints パラメーターを使用してランタイム パラメーターを設定グローバルな SQL 設定を構成することもできます。構成が完了すると、関連する実行時パラメーターが各実行に追加されます。
from odps import options
options.sql.settings = {'odps.sql.mapper.split.size': 16}
# グローバル構成に基づいてヒントが追加されます。
o.execute_sql('SELECT * FROM pyodps_iris')
完全な例
ローカルマシンに
test-pyodps-local.pyという名前のファイルを作成します。サンプルコードを記述します。
import os from odps import ODPS o = ODPS( # (推奨) 環境変数が設定されていることを確認します。 # ALIBABA_CLOUD_ACCESS_KEY_ID 環境変数がご自身の AccessKey ID に設定されていることを確認します。 access_id=os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'), # ALIBABA_CLOUD_ACCESS_KEY_SECRET 環境変数がご自身の AccessKey Secret に設定されていることを確認します。 secret_access_key=os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'), project='your-default-project', endpoint='your-end-point', ) # フィールド名と型を直接指定して、my_new_table という名前の非パーティションテーブルを作成します。 table = o.create_table('my_new_table', 'num bigint, id string', if_not_exists=True) # 非パーティションテーブル my_new_table にデータを挿入します。 records = [[111, 'aaa'], [222, 'bbb'], [333, 'ccc'], [444, 'Chinese']] o.write_table(table, records) # 非パーティションテーブル my_new_table からデータを読み取ります。 for record in o.read_table(table): print(record[0], record[1]) # SQL 文を実行してテーブルからデータを読み取ります。 result = o.execute_sql('SELECT * FROM pyodps_iris LIMIT 3;', hints={'odps.sql.allow.fullscan': 'true'}) # リソースを解放するためにテーブルをドロップします。 table.drop() print('Read data from the pyodps_iris table using open_reader:') # SQL 実行結果を読み取ります。 with result.open_reader() as reader: for record in reader: print(record[0], record[1])Python コードを実行します。
python test-pyodps-local.py出力は次のとおりです:
111 aaa 222 bbb 333 ccc 444 Chinese open_reader を使用して pyodps_iris テーブルからデータを読み取ります: 4.9 3.0 4.7 3.2 4.6 3.1