すべてのプロダクト
Search
ドキュメントセンター

MaxCompute:Open Storage Python SDK

最終更新日:Jun 24, 2026

このトピックでは、Python SDK を使用して MaxCompute データにアクセスするためのコード例を紹介します。

MaxCompute は、ストレージ API のインターフェースを提供しています。詳細については、「aliyun-odps-python-sdk」をご参照ください。

前提条件

ローカル環境でコードを実行する場合は、PyODPS がインストールされていることを確認してください。詳細については、「PyODPS のインストール」をご参照ください。

PyODPS は、次の環境でも利用できます。

  • DataWorks:PyODPS ノードには PyODPS がプリインストールされています。これらのノードで直接 PyODPS タスクを開発し、定期的に実行できます。詳細については、「DataWorks での PyODPS の使用」をご参照ください。

  • PAI:PAI Python 環境に PyODPS をインストールして実行できます。PyODPS はすべてのビルトイン PAI イメージにプリインストールされており、PAI-Designer のカスタム Python コンポーネントなどで、すぐに使用できます。PAI Notebooks での PyODPS の使用方法は、標準的な使用方法と同様です。詳細については、「基本操作の概要」および「DataFrame (非推奨)」をご参照ください。

説明

PyODPS は、MaxCompute の Python SDK です。PyODPS の詳細については、「PyODPS」をご参照ください。

Python SDK を使用して MaxCompute にアクセスするための完全なコード例については、「Python SDK Examples」をご参照ください。

  1. MaxCompute サービスに接続するための環境を構成します。

    import os
    from odps import ODPS
    from odps.apis.storage_api import *
    # ALIBABA_CLOUD_ACCESS_KEY_ID 環境変数にはアクセスキー ID が、
    # ALIBABA_CLOUD_ACCESS_KEY_SECRET 環境変数にはシークレットアクセスキーが設定されていることを確認してください。
    # セキュリティのため、アクセスキー ID とシークレットアクセスキーのハードコーディングは避けてください。
    # MaxCompute サービスのエンドポイント。VPC ネットワークからの接続のみがサポートされます。
    o = ODPS(
    		os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
    		os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
    		project='your-default-project',
    		endpoint='your-end-point'
    )
    # アクセスする MaxCompute テーブルの名前。
    table = "<table to access>"
    # MaxCompute へのアクセスに使用するクォータの名前。
    quota_name = "<quota name>"
    # MaxCompute サービスに接続し、Arrow フォーマットのストレージ API クライアントを作成します。
    def get_arrow_client():
        odps_table = o.get_table(table)
        client = StorageApiArrowClient(odps=o, table=odps_table, quota_name=quota_name)
    
        return client
    
    
    説明

    ストレージ API 専用リソースグループ (サブスクリプション) のクォータ名を取得するには:

    • ストレージ API 専用リソースグループ: MaxCompute コンソールにログインします。 左上隅で、お使いのリージョンに切り替えます。 左側のナビゲーションペインで、ワークスペース > クォータ管理 を選択して、利用可能なクォータを表示します。 詳細については、「コンピューティングリソースのクォータの管理」をご参照ください。

    • Storage API: MaxCompute コンソールにログインします。左側のナビゲーションペインで、[テナント] > [テナントプロパティ] を選択して Storage API を有効にします。

  2. テーブルデータを読み取ります。

    1. MaxCompute からデータを読み取るための読み取りセッションを作成します。

      import logging
      import sys
      from odps.apis.storage_api import *
      from util import *
      
      logger = logging.getLogger(__name__)
      # 読み取りセッションを作成します。`mode` パラメータは分割戦略を指定します。データサイズで分割する場合は 'size'、行オフセットで分割する場合は 'row' を指定します。
      def create_read_session(mode):
          client = get_arrow_client()
          req = TableBatchScanRequest(required_partitions=['pt=test_write_1'])
      
          if mode == "size":
              req.split_options = SplitOptions.get_default_options(SplitOptions.SplitMode.SIZE)
          elif mode == "row":
              req.split_options = SplitOptions.get_default_options(SplitOptions.SplitMode.ROW_OFFSET)
      
          resp = client.create_read_session(req)
      
          if resp.status != Status.OK:
              logger.info("Create read session failed")
              return
      
          logger.info("Read session id: " + resp.session_id)
      
      if __name__ == '__main__':
          logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s', level=logging.INFO)
          if len(sys.argv) != 2:
              raise ValueError("Please provide split mode: size|row")
      
          mode = sys.argv[1]
          if mode != "row" and mode != "size":
              raise ValueError("Please provide split mode: size|row")
      
          create_read_session(mode)
      
      
    2. 読み取りセッションのステータスを確認します。

      import logging
      import sys
      import time
      from odps.apis.storage_api import *
      from util import *
      
      logger = logging.getLogger(__name__)
      # データを読み取る前に、読み取りセッションが作成され、準備ができていることを確認してください。
      def check_session_status(session_id):
          client = get_arrow_client()
          req = SessionRequest(session_id=session_id)
          resp = client.get_read_session(req)
      
          if resp.status != Status.OK and resp.status != Status.WAIT:
              logger.info("Get read session failed")
              return
      
          # セッションの作成には時間がかかる場合があります。データを読み取る前に、セッションステータスが `NORMAL` になるまで待つ必要があります。
          if resp.session_status == SessionStatus.NORMAL:
              logger.info("Read session id: " + resp.session_id)
          else:
              logger.info("Session status is not expected")
      
      if __name__ == '__main__':
          logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s', level=logging.INFO)
          if len(sys.argv) != 2:
              raise ValueError("Please provide session id")
      
          session_id = sys.argv[1]
          check_session_status(session_id)
      
      
    3. MaxCompute データを読み取ります。

      # 指定された `session_id` を使用して MaxCompute からデータ行を読み取り、行の総数をカウントします。
      import logging
      import sys
      from odps.apis.storage_api import *
      from util import *
      
      logger = logging.getLogger(__name__)
      
      def read_rows(session_id):
          client = get_arrow_client()
          req = SessionRequest(session_id=session_id)
          resp = client.get_read_session(req)
      
          if resp.status != Status.OK and resp.status != Status.WAIT:
              logger.info("Get read session failed")
              return
      
          req = ReadRowsRequest(session_id=session_id)
          if resp.split_count == -1:
              req.row_index = 0
              req.row_count = resp.record_count
          else:
              req.split_index = 0
      
          reader = client.read_rows_arrow(req)
          total_line = 0
          while True:
              record_batch = reader.read()
              if record_batch is None:
                  break
              total_line += record_batch.num_rows
      
          if reader.get_status() != Status.OK:
              logger.info("Read rows failed")
              return
      
          logger.info("Total line is:" + str(total_line))
      
      if __name__ == '__main__':
          logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s', level=logging.INFO)
          if len(sys.argv) != 2:
              raise ValueError("Please provide session id")
      
          session_id = sys.argv[1]
          read_rows(session_id)
      
      

関連ドキュメント

MaxCompute ストレージ API の詳細については、「ストレージ API の概要」をご参照ください。