All Products
Search
Document Center

MaxCompute:Open Storage Python SDK

Last Updated:Jun 24, 2026

Topik ini menyediakan contoh kode untuk mengakses data MaxCompute menggunakan SDK Python.

MaxCompute menyediakan antarmuka untuk Storage API. Untuk informasi selengkapnya, lihat aliyun-odps-python-sdk.

Prasyarat

Jika Anda menjalankan kode di lingkungan lokal, pastikan PyODPS telah diinstal. Untuk informasi selengkapnya, lihat Install PyODPS.

PyODPS juga tersedia di lingkungan berikut:

  • DataWorks: PyODPS telah pra-instal pada node PyODPS. Anda dapat mengembangkan dan menjalankan tugas PyODPS secara berkala langsung di node tersebut. Untuk informasi selengkapnya, lihat Use PyODPS in DataWorks.

  • PAI: Anda dapat menginstal dan menjalankan PyODPS di lingkungan Python PAI. PyODPS telah pra-instal di semua image PAI bawaan dan siap digunakan langsung di komponen seperti komponen Python kustom di PAI-Designer. Penggunaan PyODPS di PAI Notebook mirip dengan penggunaan standarnya. Untuk informasi selengkapnya, lihat Overview of basic operations dan DataFrame (Not recommended).

Catatan

PyODPS adalah SDK Python untuk MaxCompute. Untuk informasi selengkapnya tentang PyODPS, lihat PyODPS.

Contoh

Untuk contoh kode lengkap tentang cara mengakses MaxCompute menggunakan SDK Python, lihat Python SDK Examples.

  1. Konfigurasikan lingkungan untuk terhubung ke layanan MaxCompute

    import os
    from odps import ODPS
    from odps.apis.storage_api import *
    # Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID diatur ke Access Key ID Anda,
    # dan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_SECRET diatur ke Access Key Secret Anda.
    # Untuk keamanan, hindari hardcoding Access Key ID dan Access Key Secret.
    # Titik akhir layanan MaxCompute. Hanya koneksi dari jaringan VPC yang didukung.
    o = ODPS(
    		os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
    		os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
    		project='your-default-project',
    		endpoint='your-end-point'
    )
    # Nama tabel MaxCompute yang akan diakses.
    table = "<table to access>"
    # Nama kuota yang digunakan untuk mengakses MaxCompute.
    quota_name = "<quota name>"
    # Terhubung ke layanan MaxCompute dan membuat client Storage API dalam format Arrow.
    def get_arrow_client():
        odps_table = o.get_table(table)
        client = StorageApiArrowClient(odps=o, table=odps_table, quota_name=quota_name)
    
        return client
    
    
    Catatan

    Untuk memperoleh nama kuota untuk kelompok sumber daya eksklusif Storage API (subscription):

    • Kelompok sumber daya eksklusif Storage API: Masuk ke Konsol MaxCompute. Di pojok kiri atas, alihkan ke wilayah Anda. Pada panel navigasi kiri, pilih Workspace > Quotas untuk melihat kuota yang tersedia. Untuk informasi selengkapnya, lihat Mengelola kuota untuk sumber daya komputasi.

    • Storage API: Masuk ke Konsol MaxCompute. Di panel navigasi sebelah kiri, pilih Tenants > Tenant Property untuk mengaktifkan Storage API.

  2. Baca data tabel

    1. Buat sesi baca untuk membaca data dari MaxCompute

      import logging
      import sys
      from odps.apis.storage_api import *
      from util import *
      
      logger = logging.getLogger(__name__)
      # Membuat sesi baca. Parameter mode menentukan strategi pemisahan: 'size' untuk memisahkan berdasarkan ukuran data, atau 'row' untuk memisahkan berdasarkan offset baris.
      def create_read_session(mode):
          client = get_arrow_client()
          req = TableBatchScanRequest(required_partitions=['pt=test_write_1'])
      
          if mode == "size":
              req.split_options = SplitOptions.get_default_options(SplitOptions.SplitMode.SIZE)
          elif mode == "row":
              req.split_options = SplitOptions.get_default_options(SplitOptions.SplitMode.ROW_OFFSET)
      
          resp = client.create_read_session(req)
      
          if resp.status != Status.OK:
              logger.info("Create read session failed")
              return
      
          logger.info("Read session id: " + resp.session_id)
      
      if __name__ == '__main__':
          logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s', level=logging.INFO)
          if len(sys.argv) != 2:
              raise ValueError("Please provide split mode: size|row")
      
          mode = sys.argv[1]
          if mode != "row" and mode != "size":
              raise ValueError("Please provide split mode: size|row")
      
          create_read_session(mode)
      
      
    2. Periksa status sesi baca

      import logging
      import sys
      import time
      from odps.apis.storage_api import *
      from util import *
      
      logger = logging.getLogger(__name__)
      # Sebelum membaca data, pastikan sesi baca telah dibuat dan siap.
      def check_session_status(session_id):
          client = get_arrow_client()
          req = SessionRequest(session_id=session_id)
          resp = client.get_read_session(req)
      
          if resp.status != Status.OK:
              logger.info("Get read session failed")
              return
      
          # Pembuatan sesi bisa memakan waktu. Anda harus menunggu hingga status sesi menjadi NORMAL sebelum membaca data.
          if resp.session_status == SessionStatus.NORMAL:
              logger.info("Read session id: " + resp.session_id)
          else:
              logger.info("Session status is not expected")
      
      if __name__ == '__main__':
          logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s', level=logging.INFO)
          if len(sys.argv) != 2:
              raise ValueError("Please provide session id")
      
          session_id = sys.argv[1]
          check_session_status(session_id)
      
      
    3. Baca data MaxCompute

      # Membaca baris data dari MaxCompute untuk session_id tertentu dan menghitung jumlah total baris.
      import logging
      import sys
      from odps.apis.storage_api import *
      from util import *
      
      logger = logging.getLogger(__name__)
      
      def read_rows(session_id):
          client = get_arrow_client()
          req = SessionRequest(session_id=session_id)
          resp = client.get_read_session(req)
      
          if resp.status != Status.OK and resp.status != Status.WAIT:
              logger.info("Get read session failed")
              return
      
          req = ReadRowsRequest(session_id=session_id)
          if resp.split_count == -1:
              req.row_index = 0
              req.row_count = resp.record_count
          else:
              req.split_index = 0
      
          reader = client.read_rows_arrow(req)
          total_line = 0
          while True:
              record_batch = reader.read()
              if record_batch is None:
                  break
              total_line += record_batch.num_rows
      
          if reader.get_status() != Status.OK:
              logger.info("Read rows failed")
              return
      
          logger.info("Total line is:" + str(total_line))
      
      if __name__ == '__main__':
          logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s', level=logging.INFO)
          if len(sys.argv) != 2:
              raise ValueError("Please provide session id")
      
          session_id = sys.argv[1]
          read_rows(session_id)
      
      

Dokumen terkait

Untuk informasi selengkapnya tentang Storage API MaxCompute, lihat Storage API overview.