MaxCompute は、Spark on EMR、StarRocks、Presto、PAI、Hologres などのサードパーティエンジンをサポートしており、ストレージ API を呼び出すことによって、SDK を介して MaxCompute データに直接アクセスできます。このトピックでは、Python SDK を使用して MaxCompute にアクセスするためのコード例を紹介します。
MaxCompute はストレージ API を提供しています。詳細については、「aliyun-odps-python-sdk」をご参照ください。
前提条件
このトピックのコード例では、PyODPS を使用します。コードをローカルで実行するには、PyODPS をインストールする必要があります。詳細な手順については、「PyODPS をインストールする」をご参照ください。
PyODPS は、DataWorks および PAI Notebooks でもサポートされています。
DataWorks では、PyODPS ノードに PyODPS がプリインストールされています。 PyODPS ノードで直接 PyODPS タスクを開発およびスケジュールできます。詳細については、「DataWorks で PyODPS を使用する」をご参照ください。
PAI の Python 環境では、PyODPS をインストールして実行できます。 PAI のすべての組み込みイメージには PyODPS が含まれており、PAI-Designer のカスタム Python ウィジェットなどですぐに使用できます。 PAI Notebooks での PyODPS の使用方法 は、通常の使用方法とほぼ同じです。詳細については、「基本操作の概要」および「DataFrame(非推奨)」をご参照ください。
PyODPS は MaxCompute SDK の Python バージョンです。詳細については、「PyODPS」をご参照ください。
使用例
Python SDK を使用して MaxCompute にアクセスする方法のコード例については、「Python SDK の例」をご参照ください。
MaxCompute サービスに接続するように環境をセットアップします。
import os from odps import ODPS from odps.apis.storage_api import * # Alibaba Cloud アカウントの AccessKey ID に環境変数 ALIBABA_CLOUD_ACCESS_KEY_ID を設定します。 # Alibaba Cloud アカウントの AccessKey シークレットに環境変数 ALIBABA_CLOUD_ACCESS_KEY_SECRET を設定します。 # AccessKey ID と AccessKey シークレットの文字列を直接使用しないことをお勧めします。 # エンドポイントは、MaxCompute サービスの接続アドレスです。現在、Alibaba Cloud VPC ネットワークのみがサポートされています。 o = ODPS( os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'), os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'), project='your-default-project', endpoint='your-end-point' ) # MaxCompute テーブルの名前 table = "<table to access>" # MaxCompute へのアクセスに使用されるクォータの名前 quota_name = "<quota name>" # Alibaba Cloud MaxCompute サービスに接続してアクセスし、Arrow 形式に基づいてストレージ API オブジェクトを作成します def get_arrow_client(): odps_table = o.get_table(table) client = StorageApiArrowClient(odps=o, table=odps_table, quota_name=quota_name) return client説明データ伝送サービス専用リソースグループ(サブスクリプション)のクォータ名を取得するには、次の手順に従います。
MaxCompute コンソール にログインします。左上隅でリージョンを切り替えたら、左側のナビゲーションウィンドウで [ワークエリア] > [クォータ] を選択して、使用可能なクォータのリストを表示します。詳細な手順については、「MaxCompute コンソールで計算リソースのクォータを管理する」をご参照ください。
ストレージ API:MaxCompute コンソール にログインします。左側のナビゲーションウィンドウで、[テナント] > [テナントプロパティ] を選択し、[ストレージ API スイッチ] を有効にします。
テーブル読み取り操作を実行します。
データ読み取りセッションを開始して、MaxCompute データを読み取ります。
import logging import sys from odps.apis.storage_api import * from util import * logger = logging.getLogger(__name__) # 関数 create_read_session を定義します。mode パラメーターは、データのスキャン時に使用されるシャーディングポリシーを指定するために使用されます。mode が size の場合、シャーディングはデータサイズによって実行されます。row の場合、シャーディングは行数によって実行されます。 def create_read_session(mode): client = get_arrow_client() req = TableBatchScanRequest(required_partitions=['pt=test_write_1']) if mode == "size": req.split_options = SplitOptions.get_default_options(SplitOptions.SplitMode.SIZE) elif mode == "row": req.split_options = SplitOptions.get_default_options(SplitOptions.SplitMode.ROW_OFFSET) resp = client.create_read_session(req) if resp.status != Status.OK: logger.info("Create read session failed") return logger.info("Read session id: " + resp.session_id) if __name__ == '__main__': logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s', level=logging.INFO) if len(sys.argv) != 2: raise ValueError("Please provide split mode: size|row") mode = sys.argv[1] if mode != "row" and mode != "size": raise ValueError("Please provide split mode: size|row") create_read_session(mode)データの読み取りステータスを監視および検証するためのセッションを確立します。
import logging import sys import time from odps.apis.storage_api import * from util import * logger = logging.getLogger(__name__) # データ読み取り操作を実行する前に、読み取りセッションが正常に作成され、準備完了状態になっていることを確認します。 def check_session_status(session_id): client = get_arrow_client() req = SessionRequest(session_id=session_id) resp = client.get_read_session(req) if resp.status != Status.OK: logger.info("Get read session failed") return # セッションの作成プロセスには時間がかかる場合があります。データを読み取る前に、セッションステータスが NORMAL になるまで待つ必要があります。 if resp.session_status == SessionStatus.NORMAL: logger.info("Read session id: " + resp.session_id) else: logger.info("Session status is not expected") if __name__ == '__main__': logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s', level=logging.INFO) if len(sys.argv) != 2: raise ValueError("Please provide session id") session_id = sys.argv[1] check_session_status(session_id)MaxCompute からデータを読み取ります。
# 指定された session_id を使用して MaxCompute からデータ行を読み取り、読み取られたデータ行の総数をカウントします。 import logging import sys from odps.apis.storage_api import * from util import * logger = logging.getLogger(__name__) def read_rows(session_id): client = get_arrow_client() req = SessionRequest(session_id=session_id) resp = client.get_read_session(req) if resp.status != Status.OK and resp.status != Status.WAIT: logger.info("Get read session failed") return req = ReadRowsRequest(session_id=session_id) if resp.split_count == -1: req.row_index = 0 req.row_count = resp.record_count else: req.split_index = 0 reader = client.read_rows_arrow(req) total_line = 0 while True: record_batch = reader.read() if record_batch is None: break total_line += record_batch.num_rows if reader.get_status() != Status.OK: logger.info("Read rows failed") return logger.info("Total line is:" + str(total_line)) if __name__ == '__main__': logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s', level=logging.INFO) if len(sys.argv) != 2: raise ValueError("Please provide session id") session_id = sys.argv[1] read_rows(session_id)
参照
MaxCompute ストレージ API の詳細については、「ストレージ API の概要」をご参照ください。