このトピックでは、Python SDK を使用して MaxCompute データにアクセスするためのコード例を紹介します。
MaxCompute は、ストレージ API のインターフェースを提供しています。詳細については、「aliyun-odps-python-sdk」をご参照ください。
前提条件
ローカル環境でコードを実行する場合は、PyODPS がインストールされていることを確認してください。詳細については、「PyODPS のインストール」をご参照ください。
PyODPS は、次の環境でも利用できます。
DataWorks:PyODPS ノードには PyODPS がプリインストールされています。これらのノードで直接 PyODPS タスクを開発し、定期的に実行できます。詳細については、「DataWorks での PyODPS の使用」をご参照ください。
PAI:PAI Python 環境に PyODPS をインストールして実行できます。PyODPS はすべてのビルトイン PAI イメージにプリインストールされており、PAI-Designer のカスタム Python コンポーネントなどで、すぐに使用できます。PAI Notebooks での PyODPS の使用方法は、標準的な使用方法と同様です。詳細については、「基本操作の概要」および「DataFrame (非推奨)」をご参照ください。
PyODPS は、MaxCompute の Python SDK です。PyODPS の詳細については、「PyODPS」をご参照ください。
例
Python SDK を使用して MaxCompute にアクセスするための完全なコード例については、「Python SDK Examples」をご参照ください。
MaxCompute サービスに接続するための環境を構成します。
import os from odps import ODPS from odps.apis.storage_api import * # ALIBABA_CLOUD_ACCESS_KEY_ID 環境変数にはアクセスキー ID が、 # ALIBABA_CLOUD_ACCESS_KEY_SECRET 環境変数にはシークレットアクセスキーが設定されていることを確認してください。 # セキュリティのため、アクセスキー ID とシークレットアクセスキーのハードコーディングは避けてください。 # MaxCompute サービスのエンドポイント。VPC ネットワークからの接続のみがサポートされます。 o = ODPS( os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'), os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'), project='your-default-project', endpoint='your-end-point' ) # アクセスする MaxCompute テーブルの名前。 table = "<table to access>" # MaxCompute へのアクセスに使用するクォータの名前。 quota_name = "<quota name>" # MaxCompute サービスに接続し、Arrow フォーマットのストレージ API クライアントを作成します。 def get_arrow_client(): odps_table = o.get_table(table) client = StorageApiArrowClient(odps=o, table=odps_table, quota_name=quota_name) return client説明ストレージ API 専用リソースグループ (サブスクリプション) のクォータ名を取得するには:
ストレージ API 専用リソースグループ: MaxCompute コンソールにログインします。 左上隅で、お使いのリージョンに切り替えます。 左側のナビゲーションペインで、ワークスペース > クォータ管理 を選択して、利用可能なクォータを表示します。 詳細については、「コンピューティングリソースのクォータの管理」をご参照ください。
Storage API: MaxCompute コンソールにログインします。左側のナビゲーションペインで、[テナント] > [テナントプロパティ] を選択して Storage API を有効にします。
テーブルデータを読み取ります。
MaxCompute からデータを読み取るための読み取りセッションを作成します。
import logging import sys from odps.apis.storage_api import * from util import * logger = logging.getLogger(__name__) # 読み取りセッションを作成します。`mode` パラメータは分割戦略を指定します。データサイズで分割する場合は 'size'、行オフセットで分割する場合は 'row' を指定します。 def create_read_session(mode): client = get_arrow_client() req = TableBatchScanRequest(required_partitions=['pt=test_write_1']) if mode == "size": req.split_options = SplitOptions.get_default_options(SplitOptions.SplitMode.SIZE) elif mode == "row": req.split_options = SplitOptions.get_default_options(SplitOptions.SplitMode.ROW_OFFSET) resp = client.create_read_session(req) if resp.status != Status.OK: logger.info("Create read session failed") return logger.info("Read session id: " + resp.session_id) if __name__ == '__main__': logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s', level=logging.INFO) if len(sys.argv) != 2: raise ValueError("Please provide split mode: size|row") mode = sys.argv[1] if mode != "row" and mode != "size": raise ValueError("Please provide split mode: size|row") create_read_session(mode)読み取りセッションのステータスを確認します。
import logging import sys import time from odps.apis.storage_api import * from util import * logger = logging.getLogger(__name__) # データを読み取る前に、読み取りセッションが作成され、準備ができていることを確認してください。 def check_session_status(session_id): client = get_arrow_client() req = SessionRequest(session_id=session_id) resp = client.get_read_session(req) if resp.status != Status.OK and resp.status != Status.WAIT: logger.info("Get read session failed") return # セッションの作成には時間がかかる場合があります。データを読み取る前に、セッションステータスが `NORMAL` になるまで待つ必要があります。 if resp.session_status == SessionStatus.NORMAL: logger.info("Read session id: " + resp.session_id) else: logger.info("Session status is not expected") if __name__ == '__main__': logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s', level=logging.INFO) if len(sys.argv) != 2: raise ValueError("Please provide session id") session_id = sys.argv[1] check_session_status(session_id)MaxCompute データを読み取ります。
# 指定された `session_id` を使用して MaxCompute からデータ行を読み取り、行の総数をカウントします。 import logging import sys from odps.apis.storage_api import * from util import * logger = logging.getLogger(__name__) def read_rows(session_id): client = get_arrow_client() req = SessionRequest(session_id=session_id) resp = client.get_read_session(req) if resp.status != Status.OK and resp.status != Status.WAIT: logger.info("Get read session failed") return req = ReadRowsRequest(session_id=session_id) if resp.split_count == -1: req.row_index = 0 req.row_count = resp.record_count else: req.split_index = 0 reader = client.read_rows_arrow(req) total_line = 0 while True: record_batch = reader.read() if record_batch is None: break total_line += record_batch.num_rows if reader.get_status() != Status.OK: logger.info("Read rows failed") return logger.info("Total line is:" + str(total_line)) if __name__ == '__main__': logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s', level=logging.INFO) if len(sys.argv) != 2: raise ValueError("Please provide session id") session_id = sys.argv[1] read_rows(session_id)
関連ドキュメント
MaxCompute ストレージ API の詳細については、「ストレージ API の概要」をご参照ください。