All Products
Search
Document Center

MaxCompute:Contoh SDK Python

Last Updated:Jul 06, 2025

MaxCompute mendukung mesin pihak ketiga seperti Spark on EMR, StarRocks, Presto, PAI, dan Hologres, memungkinkan akses langsung ke data MaxCompute melalui SDK dengan memanggil API Penyimpanan. Topik ini menyediakan contoh kode untuk mengakses MaxCompute menggunakan SDK Python.

MaxCompute menawarkan API penyimpanan. Untuk informasi lebih lanjut, lihat aliyun-odps-python-sdk.

Prasyarat

Contoh kode dalam topik ini menggunakan PyODPS. Untuk menjalankan kode secara lokal, Anda perlu menginstal PyODPS. Untuk instruksi terperinci, lihat Instal PyODPS.

PyODPS juga didukung di DataWorks dan Notebook PAI:

  • Di DataWorks, node PyODPS dilengkapi dengan PyODPS yang sudah diinstal sebelumnya. Anda dapat mengembangkan dan menjadwalkan tugas PyODPS langsung pada node PyODPS. Untuk detailnya, lihat Gunakan PyODPS di DataWorks.

  • Lingkungan Python PAI mencakup instalasi PyODPS. Semua gambar bawaan PAI mencakup PyODPS, sehingga memungkinkan penggunaan langsung, seperti di widget Python kustom PAI-Designer. Cara menggunakan PyODPS di Notebook PAI mirip dengan cara biasa Anda menggunakannya. Untuk detail lebih lanjut, lihat Ikhtisar Operasi Dasar dan DataFrame (tidak direkomendasikan).

Catatan

PyODPS adalah versi Python dari SDK MaxCompute. Untuk detail lebih lanjut, lihat PyODPS.

Contoh Penggunaan

Untuk contoh kode tentang cara mengakses MaxCompute menggunakan SDK Python, lihat Contoh SDK Python.

  1. Siapkan lingkungan untuk terhubung ke layanan MaxCompute.

    import os
    from odps import ODPS
    from odps.apis.storage_api import *
    # Setel variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID ke ID AccessKey akun Alibaba Cloud Anda.
    # Setel variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_SECRET ke Rahasia AccessKey akun Alibaba Cloud Anda.
    # Kami sarankan agar Anda tidak langsung menggunakan string ID AccessKey dan Rahasia AccessKey Anda.
    # Titik akhir adalah alamat koneksi layanan MaxCompute. Saat ini, hanya jaringan VPC Alibaba Cloud yang didukung.
    o = ODPS(
    		os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
    		os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
    		project='proyek-default-anda',
    		endpoint='titik-akhir-anda'
    )
    # Nama tabel MaxCompute
    table = "<tabel untuk diakses>"
    # Nama kuota yang digunakan untuk mengakses MaxCompute
    quota_name = "<nama kuota>"
    # Terhubung ke dan mengakses layanan Alibaba Cloud MaxCompute dan membuat objek API Penyimpanan berdasarkan format Arrow
    def get_arrow_client():
        odps_table = o.get_table(table)
        client = StorageApiArrowClient(odps=o, table=odps_table, quota_name=quota_name)
    
        return client
    Catatan

    Untuk mendapatkan nama kuota untuk grup sumber daya eksklusif Layanan Transmisi Data (langganan) , ikuti langkah-langkah berikut:

  2. Lakukan operasi pembacaan tabel.

    1. Mulai sesi pembacaan data untuk membaca data MaxCompute.

      import logging
      import sys
      from odps.apis.storage_api import *
      from util import *
      
      logger = logging.getLogger(__name__)
      # Tentukan fungsi create_read_session. Parameter mode digunakan untuk menentukan strategi sharding yang digunakan saat memindai data. Jika modenya adalah size, sharding dilakukan berdasarkan ukuran data. Jika row, sharding dilakukan berdasarkan jumlah baris.
      def create_read_session(mode):
          client = get_arrow_client()
          req = TableBatchScanRequest(required_partitions=['pt=test_write_1'])
      
          if mode == "size":
              req.split_options = SplitOptions.get_default_options(SplitOptions.SplitMode.SIZE)
          elif mode == "row":
              req.split_options = SplitOptions.get_default_options(SplitOptions.SplitMode.ROW_OFFSET)
      
          resp = client.create_read_session(req)
      
          if resp.status != Status.OK:
              logger.info("Pembuatan sesi baca gagal")
              return
      
          logger.info("ID sesi baca: " + resp.session_id)
      
      if __name__ == '__main__':
          logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s', level=logging.INFO)
          if len(sys.argv) != 2:
              raise ValueError("Harap berikan mode split: size|row")
      
          mode = sys.argv[1]
          if mode != "row" and mode != "size":
              raise ValueError("Harap berikan mode split: size|row")
      
          create_read_session(mode)
      
      
    2. Buat sesi untuk memantau dan memverifikasi status pembacaan data.

      import logging
      import sys
      import time
      from odps.apis.storage_api import *
      from util import *
      
      logger = logging.getLogger(__name__)
      # Pastikan bahwa sesi baca berhasil dibuat dan dalam status siap sebelum melakukan operasi pembacaan data.
      def check_session_status(session_id):
          client = get_arrow_client()
          req = SessionRequest(session_id=session_id)
          resp = client.get_read_session(req)
      
          if resp.status != Status.OK:
              logger.info("Mendapatkan sesi baca gagal")
              return
      
          # Proses pembuatan sesi mungkin memakan waktu lama. Anda perlu menunggu hingga status sesi NORMAL sebelum membaca data.
          if resp.session_status == SessionStatus.NORMAL:
              logger.info("ID sesi baca: " + resp.session_id)
          else:
              logger.info("Status sesi tidak sesuai harapan")
      
      if __name__ == '__main__':
          logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s', level=logging.INFO)
          if len(sys.argv) != 2:
              raise ValueError("Harap berikan ID sesi")
      
          session_id = sys.argv[1]
          check_session_status(session_id)
      
      
    3. Baca data dari MaxCompute.

      # Baca baris data dari MaxCompute menggunakan session_id yang ditentukan dan hitung total jumlah baris data yang dibaca.
      import logging
      import sys
      from odps.apis.storage_api import *
      from util import *
      
      logger = logging.getLogger(__name__)
      
      def read_rows(session_id):
          client = get_arrow_client()
          req = SessionRequest(session_id=session_id)
          resp = client.get_read_session(req)
      
          if resp.status != Status.OK and resp.status != Status.WAIT:
              logger.info("Mendapatkan sesi baca gagal")
              return
      
          req = ReadRowsRequest(session_id=session_id)
          if resp.split_count == -1:
              req.row_index = 0
              req.row_count = resp.record_count
          else:
              req.split_index = 0
      
      
      
          reader = client.read_rows_arrow(req)
          total_line = 0
          while True:
              record_batch = reader.read()
              if record_batch is None:
                  break
              total_line += record_batch.num_rows
      
          if reader.get_status() != Status.OK:
              logger.info("Membaca baris gagal")
              return
      
          logger.info("Total baris adalah:" + str(total_line))
      
      if __name__ == '__main__':
          logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s', level=logging.INFO)
          if len(sys.argv) != 2:
              raise ValueError("Harap berikan ID sesi")
      
          session_id = sys.argv[1]
          read_rows(session_id)
      
      
      
      

Referensi

Untuk informasi lebih lanjut tentang API penyimpanan MaxCompute, lihat Ikhtisar API Penyimpanan.