Topik ini menyediakan contoh kode untuk mengakses data MaxCompute menggunakan SDK Python.
MaxCompute menyediakan antarmuka untuk Storage API. Untuk informasi selengkapnya, lihat aliyun-odps-python-sdk.
Prasyarat
Jika Anda menjalankan kode di lingkungan lokal, pastikan PyODPS telah diinstal. Untuk informasi selengkapnya, lihat Install PyODPS.
PyODPS juga tersedia di lingkungan berikut:
DataWorks: PyODPS telah pra-instal pada node PyODPS. Anda dapat mengembangkan dan menjalankan tugas PyODPS secara berkala langsung di node tersebut. Untuk informasi selengkapnya, lihat Use PyODPS in DataWorks.
PAI: Anda dapat menginstal dan menjalankan PyODPS di lingkungan Python PAI. PyODPS telah pra-instal di semua image PAI bawaan dan siap digunakan langsung di komponen seperti komponen Python kustom di PAI-Designer. Penggunaan PyODPS di PAI Notebook mirip dengan penggunaan standarnya. Untuk informasi selengkapnya, lihat Overview of basic operations dan DataFrame (Not recommended).
PyODPS adalah SDK Python untuk MaxCompute. Untuk informasi selengkapnya tentang PyODPS, lihat PyODPS.
Contoh
Untuk contoh kode lengkap tentang cara mengakses MaxCompute menggunakan SDK Python, lihat Python SDK Examples.
Konfigurasikan lingkungan untuk terhubung ke layanan MaxCompute
import os from odps import ODPS from odps.apis.storage_api import * # Pastikan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID diatur ke Access Key ID Anda, # dan variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_SECRET diatur ke Access Key Secret Anda. # Untuk keamanan, hindari hardcoding Access Key ID dan Access Key Secret. # Titik akhir layanan MaxCompute. Hanya koneksi dari jaringan VPC yang didukung. o = ODPS( os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'), os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'), project='your-default-project', endpoint='your-end-point' ) # Nama tabel MaxCompute yang akan diakses. table = "<table to access>" # Nama kuota yang digunakan untuk mengakses MaxCompute. quota_name = "<quota name>" # Terhubung ke layanan MaxCompute dan membuat client Storage API dalam format Arrow. def get_arrow_client(): odps_table = o.get_table(table) client = StorageApiArrowClient(odps=o, table=odps_table, quota_name=quota_name) return clientCatatanUntuk memperoleh nama kuota untuk kelompok sumber daya eksklusif Storage API (subscription):
Kelompok sumber daya eksklusif Storage API: Masuk ke Konsol MaxCompute. Di pojok kiri atas, alihkan ke wilayah Anda. Pada panel navigasi kiri, pilih Workspace > Quotas untuk melihat kuota yang tersedia. Untuk informasi selengkapnya, lihat Mengelola kuota untuk sumber daya komputasi.
Storage API: Masuk ke Konsol MaxCompute. Di panel navigasi sebelah kiri, pilih Tenants > Tenant Property untuk mengaktifkan Storage API.
Baca data tabel
Buat sesi baca untuk membaca data dari MaxCompute
import logging import sys from odps.apis.storage_api import * from util import * logger = logging.getLogger(__name__) # Membuat sesi baca. Parameter mode menentukan strategi pemisahan: 'size' untuk memisahkan berdasarkan ukuran data, atau 'row' untuk memisahkan berdasarkan offset baris. def create_read_session(mode): client = get_arrow_client() req = TableBatchScanRequest(required_partitions=['pt=test_write_1']) if mode == "size": req.split_options = SplitOptions.get_default_options(SplitOptions.SplitMode.SIZE) elif mode == "row": req.split_options = SplitOptions.get_default_options(SplitOptions.SplitMode.ROW_OFFSET) resp = client.create_read_session(req) if resp.status != Status.OK: logger.info("Create read session failed") return logger.info("Read session id: " + resp.session_id) if __name__ == '__main__': logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s', level=logging.INFO) if len(sys.argv) != 2: raise ValueError("Please provide split mode: size|row") mode = sys.argv[1] if mode != "row" and mode != "size": raise ValueError("Please provide split mode: size|row") create_read_session(mode)Periksa status sesi baca
import logging import sys import time from odps.apis.storage_api import * from util import * logger = logging.getLogger(__name__) # Sebelum membaca data, pastikan sesi baca telah dibuat dan siap. def check_session_status(session_id): client = get_arrow_client() req = SessionRequest(session_id=session_id) resp = client.get_read_session(req) if resp.status != Status.OK: logger.info("Get read session failed") return # Pembuatan sesi bisa memakan waktu. Anda harus menunggu hingga status sesi menjadi NORMAL sebelum membaca data. if resp.session_status == SessionStatus.NORMAL: logger.info("Read session id: " + resp.session_id) else: logger.info("Session status is not expected") if __name__ == '__main__': logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s', level=logging.INFO) if len(sys.argv) != 2: raise ValueError("Please provide session id") session_id = sys.argv[1] check_session_status(session_id)Baca data MaxCompute
# Membaca baris data dari MaxCompute untuk session_id tertentu dan menghitung jumlah total baris. import logging import sys from odps.apis.storage_api import * from util import * logger = logging.getLogger(__name__) def read_rows(session_id): client = get_arrow_client() req = SessionRequest(session_id=session_id) resp = client.get_read_session(req) if resp.status != Status.OK and resp.status != Status.WAIT: logger.info("Get read session failed") return req = ReadRowsRequest(session_id=session_id) if resp.split_count == -1: req.row_index = 0 req.row_count = resp.record_count else: req.split_index = 0 reader = client.read_rows_arrow(req) total_line = 0 while True: record_batch = reader.read() if record_batch is None: break total_line += record_batch.num_rows if reader.get_status() != Status.OK: logger.info("Read rows failed") return logger.info("Total line is:" + str(total_line)) if __name__ == '__main__': logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s', level=logging.INFO) if len(sys.argv) != 2: raise ValueError("Please provide session id") session_id = sys.argv[1] read_rows(session_id)
Dokumen terkait
Untuk informasi selengkapnya tentang Storage API MaxCompute, lihat Storage API overview.