すべてのプロダクト
Search
ドキュメントセンター

MaxCompute:Python SDK の例

最終更新日:Jul 09, 2025

MaxCompute は、Spark on EMR、StarRocks、Presto、PAI、Hologres などのサードパーティエンジンをサポートしており、ストレージ API を呼び出すことによって、SDK を介して MaxCompute データに直接アクセスできます。このトピックでは、Python SDK を使用して MaxCompute にアクセスするためのコード例を紹介します。

MaxCompute はストレージ API を提供しています。詳細については、「aliyun-odps-python-sdk」をご参照ください。

前提条件

このトピックのコード例では、PyODPS を使用します。コードをローカルで実行するには、PyODPS をインストールする必要があります。詳細な手順については、「PyODPS をインストールする」をご参照ください。

PyODPS は、DataWorks および PAI Notebooks でもサポートされています。

  • DataWorks では、PyODPS ノードに PyODPS がプリインストールされています。 PyODPS ノードで直接 PyODPS タスクを開発およびスケジュールできます。詳細については、「DataWorks で PyODPS を使用する」をご参照ください。

  • PAI の Python 環境では、PyODPS をインストールして実行できます。 PAI のすべての組み込みイメージには PyODPS が含まれており、PAI-Designer のカスタム Python ウィジェットなどですぐに使用できます。 PAI Notebooks での PyODPS の使用方法 は、通常の使用方法とほぼ同じです。詳細については、「基本操作の概要」および「DataFrame(非推奨)」をご参照ください。

説明

PyODPS は MaxCompute SDK の Python バージョンです。詳細については、「PyODPS」をご参照ください。

使用例

Python SDK を使用して MaxCompute にアクセスする方法のコード例については、「Python SDK の例」をご参照ください。

  1. MaxCompute サービスに接続するように環境をセットアップします

    import os
    from odps import ODPS
    from odps.apis.storage_api import *
    # Alibaba Cloud アカウントの AccessKey ID に環境変数 ALIBABA_CLOUD_ACCESS_KEY_ID を設定します。
    # Alibaba Cloud アカウントの AccessKey シークレットに環境変数 ALIBABA_CLOUD_ACCESS_KEY_SECRET を設定します。
    # AccessKey ID と AccessKey シークレットの文字列を直接使用しないことをお勧めします。
    # エンドポイントは、MaxCompute サービスの接続アドレスです。現在、Alibaba Cloud VPC ネットワークのみがサポートされています。
    o = ODPS(
    		os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
    		os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
    		project='your-default-project',
    		endpoint='your-end-point'
    )
    # MaxCompute テーブルの名前
    table = "<table to access>"
    # MaxCompute へのアクセスに使用されるクォータの名前
    quota_name = "<quota name>"
    # Alibaba Cloud MaxCompute サービスに接続してアクセスし、Arrow 形式に基づいてストレージ API オブジェクトを作成します
    def get_arrow_client():
        odps_table = o.get_table(table)
        client = StorageApiArrowClient(odps=o, table=odps_table, quota_name=quota_name)
    
        return client
    説明

    データ伝送サービス専用リソースグループ(サブスクリプション)のクォータ名を取得するには、次の手順に従います。

    • MaxCompute コンソール にログインします。左上隅でリージョンを切り替えたら、左側のナビゲーションウィンドウで [ワークエリア] > [クォータ] を選択して、使用可能なクォータのリストを表示します。詳細な手順については、「MaxCompute コンソールで計算リソースのクォータを管理する」をご参照ください。

    • ストレージ API:MaxCompute コンソール にログインします。左側のナビゲーションウィンドウで、[テナント] > [テナントプロパティ] を選択し、[ストレージ API スイッチ] を有効にします。

  2. テーブル読み取り操作を実行します。

    1. データ読み取りセッションを開始して、MaxCompute データを読み取ります。

      import logging
      import sys
      from odps.apis.storage_api import *
      from util import *
      
      logger = logging.getLogger(__name__)
      # 関数 create_read_session を定義します。mode パラメーターは、データのスキャン時に使用されるシャーディングポリシーを指定するために使用されます。mode が size の場合、シャーディングはデータサイズによって実行されます。row の場合、シャーディングは行数によって実行されます。
      def create_read_session(mode):
          client = get_arrow_client()
          req = TableBatchScanRequest(required_partitions=['pt=test_write_1'])
      
          if mode == "size":
              req.split_options = SplitOptions.get_default_options(SplitOptions.SplitMode.SIZE)
          elif mode == "row":
              req.split_options = SplitOptions.get_default_options(SplitOptions.SplitMode.ROW_OFFSET)
      
          resp = client.create_read_session(req)
      
          if resp.status != Status.OK:
              logger.info("Create read session failed")
              return
      
          logger.info("Read session id: " + resp.session_id)
      
      if __name__ == '__main__':
          logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s', level=logging.INFO)
          if len(sys.argv) != 2:
              raise ValueError("Please provide split mode: size|row")
      
          mode = sys.argv[1]
          if mode != "row" and mode != "size":
              raise ValueError("Please provide split mode: size|row")
      
          create_read_session(mode)
      
      
    2. データの読み取りステータスを監視および検証するためのセッションを確立します。

      import logging
      import sys
      import time
      from odps.apis.storage_api import *
      from util import *
      
      logger = logging.getLogger(__name__)
      # データ読み取り操作を実行する前に、読み取りセッションが正常に作成され、準備完了状態になっていることを確認します。
      def check_session_status(session_id):
          client = get_arrow_client()
          req = SessionRequest(session_id=session_id)
          resp = client.get_read_session(req)
      
          if resp.status != Status.OK:
              logger.info("Get read session failed")
              return
      
          # セッションの作成プロセスには時間がかかる場合があります。データを読み取る前に、セッションステータスが NORMAL になるまで待つ必要があります。
          if resp.session_status == SessionStatus.NORMAL:
              logger.info("Read session id: " + resp.session_id)
          else:
              logger.info("Session status is not expected")
      
      if __name__ == '__main__':
          logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s', level=logging.INFO)
          if len(sys.argv) != 2:
              raise ValueError("Please provide session id")
      
          session_id = sys.argv[1]
          check_session_status(session_id)
      
      
    3. MaxCompute からデータを読み取ります。

      # 指定された session_id を使用して MaxCompute からデータ行を読み取り、読み取られたデータ行の総数をカウントします。
      import logging
      import sys
      from odps.apis.storage_api import *
      from util import *
      
      logger = logging.getLogger(__name__)
      
      def read_rows(session_id):
          client = get_arrow_client()
          req = SessionRequest(session_id=session_id)
          resp = client.get_read_session(req)
      
          if resp.status != Status.OK and resp.status != Status.WAIT:
              logger.info("Get read session failed")
              return
      
          req = ReadRowsRequest(session_id=session_id)
          if resp.split_count == -1:
              req.row_index = 0
              req.row_count = resp.record_count
          else:
              req.split_index = 0
      
      
      
          reader = client.read_rows_arrow(req)
          total_line = 0
          while True:
              record_batch = reader.read()
              if record_batch is None:
                  break
              total_line += record_batch.num_rows
      
          if reader.get_status() != Status.OK:
              logger.info("Read rows failed")
              return
      
          logger.info("Total line is:" + str(total_line))
      
      if __name__ == '__main__':
          logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s', level=logging.INFO)
          if len(sys.argv) != 2:
              raise ValueError("Please provide session id")
      
          session_id = sys.argv[1]
          read_rows(session_id)
      
      
      
      

参照

MaxCompute ストレージ API の詳細については、「ストレージ API の概要」をご参照ください。