全部產品
Search
文件中心

MaxCompute:開放儲存SDK樣本-Python SDK

更新時間:Jun 05, 2025

MaxCompute支援第三方引擎(如Spark on EMR、StarRocks、Presto、PAI和Hologres)通過SDK調用Storage API直接存取MaxCompute資料,本文為您介紹使用Python SDK訪問MaxCompute的程式碼範例。

MaxCompute提供了開放儲存相關介面,詳情請參見aliyun-odps-python-sdk

前提條件

本文範例程式碼是基於PyODPS,若您是在本地環境執行以下代碼,請確保已安裝PyODPS。具體操作,請參見安裝PyODPS

同時PyODPS還支援在DataWorks、PAI Notebooks中使用,其中:

  • DataWorks的PyODPS節點已安裝好了PyODPS,您可以直接在DataWorks的PyODPS節點上開發PyODPS任務並周期性運行,操作指導請參見通過DataWorks使用PyODPS

  • PAI的Python環境也可安裝運行PyODPS,其中PAI的內建鏡像均已安裝好了PyODPS可直接使用,如PAI-Designer的自訂Python組件,在PAI Notebooks中使用PyODPS的方式與通用的使用方式基本一致,可參考基本操作概述DataFrame(不推薦使用)

說明

PyODPS是MaxCompute的Python版本的SDK,關於PyODPS詳情,請參見PyODPS

使用樣本

使用Python SDK訪問MaxCompute的程式碼範例,詳情請參見Python SDK Examples

  1. 配置串連MaxCompute服務的環境

    import os
    from odps import ODPS
    from odps.apis.storage_api import *
    # 確保 ALIBABA_CLOUD_ACCESS_KEY_ID 環境變數設定為使用者 Access Key ID,
    # ALIBABA_CLOUD_ACCESS_KEY_SECRET 環境變數設定為使用者 Access Key Secret,
    # 不建議直接使用 Access Key ID / Access Key Secret 字串
    # endpoint為MaxCompute服務的串連地址,當前僅支援使用阿里雲VPC網路
    o = ODPS(
    		os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
    		os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
    		project='your-default-project',
    		endpoint='your-end-point'
    )
    #MaxCompute表名稱
    table = "<table to access>"
    # 訪問MaxCompute使用的Quota名稱
    quota_name = "<quota name>"
    # 串連並訪問阿里雲MaxCompute服務並建立基於Arrow格式的Storage API對象
    def get_arrow_client():
        odps_table = o.get_table(table)
        client = StorageApiArrowClient(odps=o, table=odps_table, quota_name=quota_name)
    
        return client
    
    
    說明

    擷取獨享Data Transmission Service資源群組(訂用帳戶)資源的Quota名稱的方式分別如下:

    • 獨享Data Transmission Service資源群組:登入MaxCompute控制台,左上方切換地區後,在左側導覽列選擇工作區>配額(Quota)管理,查看可使用的Quota列表。具體操作,請參見計算資源-Quota管理

    • 開放儲存:登入MaxCompute控制台,在左側導覽列選擇租戶管理>租戶屬性,開啟開放儲存。

  2. 讀表操作。

    1. 建立資料讀取會話,讀取MaxCompute資料。

      import logging
      import sys
      from odps.apis.storage_api import *
      from util import *
      
      logger = logging.getLogger(__name__)
      # 定義函數create_read_session,mode參數用於指定掃描資料時所使用的分區策略,若mode為size,按資料大小進行分區,若為row,按行數進行分區
      def create_read_session(mode):
          client = get_arrow_client()
          req = TableBatchScanRequest(required_partitions=['pt=test_write_1'])
      
          if mode == "size":
              req.split_options = SplitOptions.get_default_options(SplitOptions.SplitMode.SIZE)
          elif mode == "row":
              req.split_options = SplitOptions.get_default_options(SplitOptions.SplitMode.ROW_OFFSET)
      
          resp = client.create_read_session(req)
      
          if resp.status != Status.OK:
              logger.info("Create read session failed")
              return
      
          logger.info("Read session id: " + resp.session_id)
      
      if __name__ == '__main__':
          logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s', level=logging.INFO)
          if len(sys.argv) != 2:
              raise ValueError("Please provide split mode: size|row")
      
          mode = sys.argv[1]
          if mode != "row" and mode != "size":
              raise ValueError("Please provide split mode: size|row")
      
          create_read_session(mode)
      
      
    2. 建立監控和檢查資料讀取狀態的會話。

      import logging
      import sys
      import time
      from odps.apis.storage_api import *
      from util import *
      
      logger = logging.getLogger(__name__)
      # 確保在執行資料讀取操作前,確認read session它已經成功建立並處於準備就緒的狀態
      def check_session_status(session_id):
          client = get_arrow_client()
          req = SessionRequest(session_id=session_id)
          resp = client.get_read_session(req)
      
          if resp.status != Status.OK:
              logger.info("Get read session failed")
              return
      
          # session建立過程可能時間較長,需要等待session status為NORMAL才可以讀資料
          if resp.session_status == SessionStatus.NORMAL:
              logger.info("Read session id: " + resp.session_id)
          else:
              logger.info("Session status is not expected")
      
      if __name__ == '__main__':
          logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s', level=logging.INFO)
          if len(sys.argv) != 2:
              raise ValueError("Please provide session id")
      
          session_id = sys.argv[1]
          check_session_status(session_id)
      
      
    3. 讀取MaxCompute資料。

      # 通過指定的session_id從MaxCompute中讀取資料行,並統計總共讀取的資料行數
      import logging
      import sys
      from odps.apis.storage_api import *
      from util import *
      
      logger = logging.getLogger(__name__)
      
      def read_rows(session_id):
          client = get_arrow_client()
          req = SessionRequest(session_id=session_id)
          resp = client.get_read_session(req)
      
          if resp.status != Status.OK and resp.status != Status.WAIT:
              logger.info("Get read session failed")
              return
      
          req = ReadRowsRequest(session_id=session_id)
          if resp.split_count == -1:
              req.row_index = 0
              req.row_count = resp.record_count
          else:
              req.split_index = 0
      
          reader = client.read_rows_arrow(req)
          total_line = 0
          while True:
              record_batch = reader.read()
              if record_batch is None:
                  break
              total_line += record_batch.num_rows
      
          if reader.get_status() != Status.OK:
              logger.info("Read rows failed")
              return
      
          logger.info("Total line is:" + str(total_line))
      
      if __name__ == '__main__':
          logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s', level=logging.INFO)
          if len(sys.argv) != 2:
              raise ValueError("Please provide session id")
      
          session_id = sys.argv[1]
          read_rows(session_id)
      
      

相關文檔

關於MaxCompute開放儲存詳情,請參見開放儲存概述