All Products
Search
Document Center

MaxCompute:Gunakan PyODPS di DataWorks

Last Updated:Mar 26, 2026

PyODPS adalah SDK Python untuk MaxCompute. Di DataWorks, Anda dapat membuat node PyODPS untuk menulis dan menjalankan kode Python terhadap MaxCompute—dengan titik masuk MaxCompute yang telah dikonfigurasi sebelumnya, sehingga tidak memerlukan pengaturan otentikasi.

Topik ini mencakup perilaku khusus DataWorks dari node PyODPS: batasan lingkungan, kemampuan utama, dan contoh kode.

Prasyarat

Sebelum memulai, pastikan Anda telah memiliki:

  • Ruang kerja DataWorks dengan MaxCompute yang diaktifkan

  • Node PyODPS yang dibuat di DataStudio — baik PyODPS 2 (Python 2) maupun PyODPS 3 (Python 3)

Untuk petunjuk pembuatan node, lihat Kembangkan task PyODPS 2 dan Kembangkan task PyODPS 3.

新建节点

Batasan

Memori

Jika penggunaan memori melebihi batas node, node akan menampilkan pesan Got killed dan menghentikan tugas. Untuk menghindarinya, dorong tugas pemrosesan data ke MaxCompute agar dieksekusi secara terdistribusi, bukan mengunduh data ke lingkungan DataWorks lalu memprosesnya secara lokal. Untuk perbandingan kedua pendekatan tersebut, lihat bagian "Perhatian" dalam Ikhtisar.

Catatan data

Secara default, options.tunnel.use_instance_tunnel bernilai False di node PyODPS DataWorks. Artinya, instance.open_reader menggunakan antarmuka Result, yang mengembalikan maksimal 10.000 catatan data dan memiliki dukungan terbatas untuk tipe data kompleks. Untuk membaca semua catatan atau menangani tipe kompleks seperti Arrays, aktifkan InstanceTunnel. Lihat Baca hasil eksekusi SQL untuk detailnya.

Paket

Sebagian besar paket ilmu data umum telah dipra-instal (lihat Paket yang dipra-instal). Batasan berikut berlaku:

  • atexit tidak didukung. Gunakan try-finally sebagai gantinya.

  • matplotlib tidak tersedia, sehingga memengaruhi fungsi plot pada DataFrame.

  • UDF DataFrame dijalankan dalam sandbox Python dan hanya dapat menggunakan pustaka Python murni serta NumPy. Pustaka pihak ketiga lain seperti pandas tidak didukung di dalam UDF. Untuk operasi non-UDF, NumPy dan pandas tersedia.

  • Paket pihak ketiga yang mengandung kode biner tidak didukung.

Titik masuk MaxCompute

Setiap node PyODPS menyediakan odps dan o sebagai variabel global—keduanya merujuk ke titik masuk MaxCompute. DataWorks mengonfigurasi ini secara otomatis, sehingga tidak diperlukan pengaturan otentikasi dalam kode Anda.

# Periksa apakah tabel pyodps_iris ada.
print(o.exist_table('pyodps_iris'))
Objek entri o hanya dapat mengakses MaxCompute. Objek ini tidak dapat mengakses layanan Alibaba Cloud lainnya, dan otentikasi tambahan tidak dapat diperoleh menggunakan metode seperti o.from_global.

Eksekusi pernyataan SQL

Gunakan execute_sql() atau run_sql() untuk menjalankan pernyataan DDL (data definition language) dan DML (data manipulation language).

o.execute_sql('select * from pyodps_iris')

Untuk pernyataan non-DDL/DML, gunakan metode yang sesuai:

  • run_security_query — untuk pernyataan GRANT dan REVOKE

  • run_xflow atau execute_xflow — untuk operasi API

Untuk dokumentasi SQL lengkap, lihat SQL.

Baca hasil eksekusi SQL

Karena InstanceTunnel tidak diaktifkan secara default di DataWorks, instance.open_reader dibatasi hingga 10.000 catatan dan mungkin tidak mendukung tipe data kompleks. Jika proyek Anda tidak memiliki perlindungan data yang diaktifkan dan Anda perlu membaca semua catatan atau menggunakan tipe kompleks seperti Arrays, aktifkan InstanceTunnel.

Opsi 1: Aktifkan secara global

Berlaku untuk semua panggilan open_reader berikutnya di dalam node.

options.tunnel.use_instance_tunnel = True
options.tunnel.limit_instance_tunnel = False  # Hapus batas 10.000 catatan.

with instance.open_reader() as reader:
    # InstanceTunnel aktif. Gunakan reader.count untuk mendapatkan jumlah total catatan.
    pass

Opsi 2: Aktifkan per pembaca

Berlaku hanya untuk panggilan open_reader saat ini.

with instance.open_reader(tunnel=True, limit=False) as reader:
    # InstanceTunnel aktif untuk reader ini. Semua catatan dapat diakses.
    pass

Untuk informasi lebih lanjut, lihat Dapatkan hasil eksekusi pernyataan SQL.

DataFrame

Untuk menjalankan operasi DataFrame di DataWorks, panggil secara eksplisit suatu metode yang langsung dieksekusi seperti execute atau persist. Tanpa ini, operasi tidak akan dipicu.

from odps.df import DataFrame

iris = DataFrame(o.get_table('pyodps_iris'))
# Gunakan execute() untuk memicu operasi dan melakukan iterasi atas hasilnya.
for record in iris[iris.sepalwidth < 3].execute():
    print(record)

Secara default, options.verbose diaktifkan di DataWorks, sehingga detail eksekusi seperti URL Logview dicetak secara otomatis.

Untuk informasi lebih lanjut, lihat DataFrame (tidak direkomendasikan).

Parameter penjadwalan

DataWorks menyuntikkan parameter penjadwalan ke dalam node PyODPS secara berbeda dibandingkan node SQL:

  • Node SQL: ${param_name} langsung digantikan ke dalam string SQL.

  • Node PyODPS: Sebuah dictionary global args diisi sebelum kode dijalankan. Baca nilai parameter menggunakan args['param_name'], bukan ${param_name}.

Desain ini mencegah substitusi string yang tidak disengaja dalam kode Python.

Contoh: Pada tab Scheduling configuration sebuah node PyODPS, atur ds=${yyyymmdd} di bidang Parameters di bawah Basic properties. Kemudian baca nilainya dalam kode:

# Cetak nilai parameter penjadwalan ds, misalnya ds=20161116.
print('ds=' + args['ds'])

Untuk mengkueri data dari partisi yang ditunjuk oleh ds:

o.get_table('table_name').get_partition('ds=' + args['ds'])

Untuk informasi lebih lanjut, lihat Konfigurasikan dan gunakan parameter penjadwalan.

Petunjuk waktu proses

Gunakan parameter hints untuk meneruskan pengaturan waktu proses ke execute_sql. Nilainya harus berupa dict.

o.execute_sql('select * from pyodps_iris', hints={'odps.sql.mapper.split.size': 16})

Untuk menerapkan pengaturan yang sama ke semua eksekusi SQL dalam node, konfigurasikan options.sql.settings secara global:

from odps import options

options.sql.settings = {'odps.sql.mapper.split.size': 16}
o.execute_sql('select * from pyodps_iris')  # Petunjuk diterapkan secara otomatis.

Paket pihak ketiga

Paket yang dipra-instal

Paket-paket berikut telah dipra-instal di node DataWorks:

PackageNode Python 2Node Python 3
requests2.11.12.26.0
numpy1.16.61.18.1
pandas0.24.21.0.5
scipy0.19.01.3.0
scikit_learn0.18.10.22.1
pyarrow0.16.02.0.0
lz42.1.43.1.10
zstandard0.14.10.17.0

Instal paket kustom

Jika paket yang Anda butuhkan belum dipra-instal, gunakan pyodps-pack untuk mengemasnya dan load_resource_package untuk memuatnya di dalam node.

  1. Kemas paket tersebut. Contoh berikut mengemas paket ipaddress:

    pyodps-pack -o ipaddress-bundle.tar.gz ipaddress

    Untuk node Python 2, tambahkan --dwpy27:

    pyodps-pack --dwpy27 -o ipaddress-bundle.tar.gz ipaddress

    Untuk mengurangi ukuran paket, kecualikan paket yang sudah dipra-instal di DataWorks:

    pyodps-pack -o bundle.tar.gz --exclude numpy --exclude pandas <your-package>

    Ukuran total paket yang diunduh tidak boleh melebihi 100 MB.

  2. Unggah dan kirimkan file .tar.gz sebagai resource MaxCompute.

  3. Di dalam node PyODPS, muat dan impor paket tersebut:

    load_resource_package("ipaddress-bundle.tar.gz")
    import ipaddress

Untuk informasi lebih lanjut, lihat Hasilkan paket pihak ketiga untuk PyODPS dan Referensikan paket pihak ketiga di node PyODPS.

Akses MaxCompute dengan akun berbeda

Secara default, objek entri o menggunakan kredensial yang disediakan oleh DataWorks untuk ruang kerja saat ini. Untuk mengakses proyek MaxCompute menggunakan Akun Alibaba Cloud yang berbeda, gunakan metode as_account untuk membuat objek entri terpisah.

Penting

as_account memerlukan PyODPS versi 0.11.3 atau lebih baru.

Prosedur

  1. Berikan izin yang diperlukan kepada akun baru pada proyek tersebut. Lihat Lampiran: Berikan izin ke akun lain.

  2. Di dalam node PyODPS, buat objek entri untuk akun baru:

    import os
    
    # Simpan kredensial dalam variabel lingkungan, jangan hardcoding.
    new_odps = o.as_account(
        os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
        os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET')
    )
  3. Verifikasi bahwa pergantian akun berhasil dengan memeriksa pengguna saat ini:

    print(new_odps.get_project().current_user)

    Jika output sesuai dengan ID AccessKey akun baru, pergantian berhasil.

Contoh

Contoh ini membuat tabel, mengkuerinya menggunakan akun berbeda, dan mencetak hasilnya.

  1. Buat tabel pyodps_iris dan impor data sampel. Untuk petunjuknya, lihat Buat tabel dan unggah data.

    CREATE TABLE IF NOT EXISTS pyodps_iris
    (
      sepallength  DOUBLE COMMENT 'sepal length (cm)',
      sepalwidth   DOUBLE COMMENT 'sepal width (cm)',
      petallength  DOUBLE COMMENT 'petal length (cm)',
      petalwidth   DOUBLE COMMENT 'petal width (cm)',
      name         STRING COMMENT 'type'
    );
  2. Berikan izin akun baru pada proyek dan tabel tersebut. Lihat Lampiran: Berikan izin ke akun lain.

  3. Buat node PyODPS 3 dan jalankan kode berikut. Untuk petunjuknya, lihat Kembangkan task PyODPS 3.

    from odps import ODPS
    import os
    
    # Simpan kredensial dalam variabel lingkungan, jangan hardcoding.
    os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'] = '<your-access-key-id>'
    os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET'] = '<your-access-key-secret>'
    
    od = o.as_account(
        os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
        os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET')
    )
    
    # Kueri baris dengan sepallength > 5.
    with od.execute_sql('SELECT * FROM pyodps_iris WHERE sepallength > 5').open_reader() as reader:
        print(reader.raw)
        for record in reader:
            print(record["sepallength"], record["sepalwidth"],
                  record["petallength"], record["petalwidth"], record["name"])
    
    # Verifikasi pengguna saat ini.
    print(od.get_project().current_user)
  4. Jalankan node tersebut. Outputnya mirip dengan berikut:

    Executing user script with PyODPS 0.11.4.post0
    
    "sepallength","sepalwidth","petallength","petalwidth","name"
    5.4,3.9,1.7,0.4,"Iris-setosa"
    ...
    <User 139xxxxxxxxxxxxx>

Diagnostik

Jika eksekusi node hang tanpa output, tambahkan komentar berikut di bagian atas kode Anda. DataWorks akan mencetak jejak stack semua thread setiap 30 detik.

# -*- dump_traceback: true -*-
Fitur ini memerlukan node PyODPS 3 yang menjalankan versi lebih baru dari 0.11.4.1.

Periksa versi PyODPS

Jalankan kode berikut di node PyODPS untuk mencetak versi yang terinstal:

import odps
print(odps.__version__)
# Contoh output: 0.11.2.3

Versi tersebut juga ditampilkan di log waktu proses node.

image.png

Lampiran: Berikan izin ke akun lain

Untuk mengizinkan Akun Alibaba Cloud lain mengakses proyek dan tabel di ruang kerja saat ini, buat node ODPS SQL dan jalankan perintah berikut. Untuk petunjuk pembuatan node, lihat Buat node ODPS SQL. Untuk informasi lebih lanjut tentang izin, lihat Pengguna dan izin.

-- Tambahkan akun ke proyek.
ADD USER ALIYUN$<account_name>;

-- Berikan izin CreateInstance pada proyek.
GRANT CreateInstance ON PROJECT <project_name> TO USER ALIYUN$<account_name>;

-- Berikan izin Describe dan Select pada tabel.
GRANT Describe, Select ON TABLE <table_name> TO USER ALIYUN$<account_name>;

-- Verifikasi izin tersebut.
SHOW GRANTS FOR ALIYUN$<account_name>;
image.png

Lampiran: Data sampel

Contoh dalam topik ini menggunakan tabel pyodps_iris. Untuk membuat tabel dan mengimpor dataset iris, ikuti Langkah 1 dalam Gunakan node PyODPS untuk mengkueri data berdasarkan kriteria tertentu.

Langkah selanjutnya