PyODPS は MaxCompute の Python SDK です。MaxCompute のジョブの作成、テーブルやビューのクエリ、リソース管理を行うための Python インターフェイスを提供します。PyODPS は、ファイルのアップロードとダウンロード、テーブル作成、SQL クエリ、MapReduce ジョブの送信、およびユーザー定義関数 (UDF) をサポートしています。
機能
サポートツール
PyODPS は、ローカル環境、DataWorks、および PAI ノートブックで実行できます。
PyODPS タスクのためにローカルマシンに全データをダウンロードすると、メモリ不足 (OOM) エラーが発生する可能性があるため、避けてください。タスクを MaxCompute に送信して分散実行してください。重要:PyODPS を実行するためにローカルマシンに全データをダウンロードしないでください。
-
ローカル環境:ローカルに PyODPS をインストールして使用します。ローカル環境での PyODPS の使用。
-
DataWorks:DataWorks の PyODPS ノードには PyODPS がプリインストールされており、タスクの開発とスケジューリングに使用できます。DataWorks での PyODPS の使用。
-
PAI ノートブック:PAI Python 環境で PyODPS をインストールして実行できます。PyODPS は、PAI-Designer のカスタム Python コンポーネントを含む、PAI の組み込みイメージにもプリインストールされています。使い方は標準的な方法と同じです。基本的な操作およびDataFrame (非推奨)。
重要:PyODPS を実行するためにローカルマシンに全データをダウンロードしないでください
PyODPS は、PC、DataWorks の PyODPS ノード、PAI ノートブック環境など、さまざまなクライアントで動作します。
PyODPS は、トンネルダウンロード、execute、to_pandas などの操作を提供し、データをローカルマシンにプルします。新規ユーザーは、データをローカルにプルし、処理してから再度アップロードすることがよくありますが、このアプローチは MaxCompute の分散コンピューティング機能をバイパスするため非効率です。
|
データ処理方法 |
説明 |
シナリオ例 |
|
ローカルでの処理 (非推奨、OOM のリスクあり) |
DataWorks の PyODPS ノードは、PyODPS パッケージが組み込まれたリソースに制約のあるクライアントコンテナです。MaxCompute のコンピューティングリソースは使用せず、厳しいメモリ制限があります。 |
|
|
MaxCompute での分散実行 (推奨) |
PyODPS の分散 DataFrame を使用して、計算集約型のタスクをローカルクライアントノードで処理する代わりに MaxCompute に送信します。 説明
SQL の結果を DataFrame に変換するには、
|
PyODPS DataFrame の これらの操作は SQL に変換され、MaxCompute 上で分散コンピューティングが行われます。これにより、ローカルメモリの消費を最小限に抑え、単一マシンでの処理に比べて大幅なパフォーマンス向上が得られます。 |
次の形態素解析の例では、両方の方法を比較します。
-
シナリオ例:
あるユーザーが、単一列の文字列テーブルから日々のログ文字列を分析します。目標は、jieba ライブラリを使用して中国語の文を形態素解析し、キーワードを見つけて、結果を新しいテーブルに保存することです。
-
非効率な処理コードのデモ:
import jieba t = o.get_table('word_split') out = [] with t.open_reader() as reader: for r in reader: words = list(jieba.cut(r[0])) # # processed_data を生成するための処理ロジック # out.append(processed_data) out_t = o.get_table('words') with out_t.open_writer() as writer: writer.write(out)この単一マシンのアプローチでは、データを 1 行ずつ読み取り、処理し、書き込みます。ダウンロードとアップロードのサイクルは遅く、メモリを大量に消費します。DataWorks ノードでは、デフォルトのメモリが限られているため、ジョブが OOM エラーに見舞われることがよくあります。
-
効率的な処理コードのデモ:
from odps.df import output out_table = o.get_table('words') df = o.get_table('word_split').to_df() # 返されるフィールドと型は次のとおりと仮定します out_names = ["word", "count"] out_types = ["string", "int"] @output(out_names, out_types) def handle(row): import jieba words = list(jieba.cut(row[0])) # # processed_data を生成するための処理ロジック # yield processed_data df.apply(handle, axis=1).persist(out_table.name)applyを使用して分散実行を実装します:-
handle関数はシリアル化され、サーバーサイドで UDF として実行されます。コアとなる行ごとのロジックは同じままですが、MaxCompute はそれを複数のマシンで並列に実行します。 -
persistインターフェイスは、出力をMaxComputeテーブルに直接書き込むことで、すべてのデータをクラスター内に保持し、ローカルの帯域幅とメモリを節約します。 -
MaxCompute は UDF で
jiebaのようなサードパーティパッケージをサポートしているため、最小限のコード変更で分散コンピューティングを活用できます。
-
制限
-
サンドボックスの制限により、ローカルの Pandas バックエンドでのテストに合格したプログラムが MaxCompute で実行できない場合があります。