全部产品
Search
文档中心

MaxCompute:Mengembangkan tugas PyODPS 3

更新时间:Nov 05, 2025

DataWorks menyediakan node PyODPS 3 yang memungkinkan Anda menulis kode Python untuk pekerjaan MaxCompute serta menjadwalkannya secara berkala. Topik ini menjelaskan cara menggunakan DataWorks untuk mengonfigurasi dan menjadwalkan tugas Python.

Prasyarat

Sebuah node PyODPS 3 telah dibuat. Untuk informasi lebih lanjut, lihat Buat dan Kelola Node MaxCompute.

Informasi latar belakang

PyODPS adalah SDK MaxCompute untuk Python yang menyediakan antarmuka pemrograman sederhana dan nyaman. Dengan PyODPS, Anda dapat menggunakan Python untuk menulis kode pekerjaan MaxCompute, menanyakan tabel dan tampilan MaxCompute, serta mengelola sumber daya MaxCompute. Untuk informasi lebih lanjut, lihat PyODPS. Di DataWorks, Anda dapat menggunakan node PyODPS untuk menjadwalkan tugas Python dan mengintegrasikannya dengan jenis tugas lainnya.

Peringatan

  • Jika paket pihak ketiga perlu dirujuk saat menjalankan node PyODPS pada grup sumber daya DataWorks, gunakan grup sumber daya tanpa server dan buat gambar kustom untuk menginstal paket pihak ketiga.

    Catatan

    Jika paket pihak ketiga diperlukan dalam fungsi yang ditentukan pengguna (UDF) di kode node, metode sebelumnya tidak dapat digunakan untuk menginstal paket pihak ketiga. Untuk informasi tentang cara merujuk paket pihak ketiga dalam UDF, lihat Contoh: Merujuk Paket Pihak Ketiga dalam Python UDF.

  • Untuk meningkatkan versi PyODPS dari node PyODPS 3 yang berjalan pada grup sumber daya serverless, Anda dapat menggunakan fitur manajemen gambar untuk menjalankan perintah /home/tops/bin/pip3 install pyodps==0.12.1 pada grup sumber daya. Untuk meningkatkan versi PyODPS dari node PyODPS 3 yang berjalan pada grup sumber daya eksklusif untuk penjadwalan, Anda dapat menggunakan fitur O&M Assistant untuk menjalankan perintah yang sama pada grup sumber daya. Saat menjalankan perintah, Anda dapat mengubah 0.12.1 ke versi yang ingin Anda tingkatkan. Untuk informasi lebih lanjut tentang fitur manajemen gambar dan fitur O&M Assistant, lihat Kelola Gambar dan Gunakan Fitur O&M Assistant.

  • Untuk mengakses sumber data atau layanan di lingkungan jaringan khusus seperti virtual private cloud (VPC) atau pusat data, gunakan grup sumber daya tanpa server untuk menjalankan node PyODPS dan buat koneksi jaringan antara grup sumber daya dan sumber data atau layanan. Untuk informasi lebih lanjut, lihat Solusi Konektivitas Jaringan.

  • Untuk informasi lebih lanjut tentang sintaks PyODPS, lihat Ikhtisar.

  • Node PyODPS diklasifikasikan menjadi node PyODPS 2 dan node PyODPS 3. Node PyODPS 2 menggunakan Python 2, sedangkan node PyODPS 3 menggunakan Python 3. Anda dapat membuat node PyODPS sesuai dengan versi Python yang digunakan.

  • Jika garis keturunan data tidak dapat dihasilkan dengan mengeksekusi pernyataan SQL dalam node PyODPS dan Peta Data tidak dapat menampilkan garis keturunan data sesuai harapan, masalah tersebut dapat diselesaikan dengan mengonfigurasi parameter penjadwalan secara manual dalam kode node PyODPS. Untuk informasi lebih lanjut tentang cara melihat garis keturunan data, lihat Lihat Garis Keturunan Data. Untuk informasi lebih lanjut tentang cara mengonfigurasi parameter, lihat Konfigurasikan Parameter Hints. Contoh kode berikut menunjukkan cara mendapatkan parameter yang diperlukan untuk menjalankan tugas:

    import os
    ...
    # dapatkan parameter runtime Penjadwal DataWorks
    skynet_hints = {}
    for k, v in os.environ.items():
        if k.startswith('SKYNET_'):
            skynet_hints[k] = v
    ...
    # atur hints saat mengirimkan tugas
    o.execute_sql('INSERT OVERWRITE TABLE XXXX SELECT * FROM YYYY WHERE ***', hints=skynet_hints)
    ...
  • Log keluaran node PyODPS dapat mencapai ukuran hingga 4 MB. Hindari memasukkan banyak entri hasil data dalam log keluaran. Sebagai alternatif, fokuskan pada penyediaan log peringatan dan log terkait kemajuan untuk mendapatkan informasi yang relevan.

Batasan

  • Karena spesifikasi sumber daya dalam grup sumber daya eksklusif untuk penjadwalan yang digunakan untuk menjalankan node PyODPS, disarankan agar Anda memproses tidak lebih dari 50 MB data dari mesin lokal. Jika node PyODPS memproses lebih dari 50 MB data, pengecualian kehabisan memori (OOM) mungkin terjadi, dan sistem dapat melaporkan Got killed. Hindari menulis kode pemrosesan data yang berlebihan untuk node PyODPS. Untuk informasi lebih lanjut, lihat Praktik Terbaik untuk Penggunaan Efisien Node PyODPS.

  • Saat menggunakan grup sumber daya tanpa server untuk menjalankan node PyODPS, konfigurasikan jumlah CU yang sesuai berdasarkan jumlah data yang perlu diproses dalam node.

    Catatan

    Saat menjalankan tugas pada grup sumber daya serverless, satu tugas dapat dikonfigurasi hingga 64 CU. Namun, disarankan untuk menjaga pengaturan dalam 16 CU untuk menghindari potensi kekurangan sumber daya yang dapat mempengaruhi startup tugas.

  • Jika sistem melaporkan Got killed, penggunaan memori melebihi batas, dan proses terkait dihentikan. Hindari melakukan operasi pada data lokal. Namun, batasan penggunaan memori tidak berlaku untuk tugas SQL atau DataFrame (tidak termasuk tugas to_pandas) yang dimulai oleh PyODPS.

  • Anda dapat menggunakan pustaka NumPy dan pandas yang sudah diinstal sebelumnya di DataWorks untuk menjalankan fungsi selain UDF. Paket pihak ketiga yang berisi kode biner tidak didukung.

  • Untuk alasan kompatibilitas, options.tunnel.use_instance_tunnel diatur ke False secara default di DataWorks. Untuk mengaktifkan InstanceTunnel secara global, atur parameter ini ke True.

  • Python 3 mendefinisikan bytecode secara berbeda di subversi seperti Python 3.7 dan Python 3.8.

    MaxCompute kompatibel dengan Python 3.7. Klien MaxCompute yang menggunakan subversi lain dari Python 3 dapat mengembalikan kesalahan saat kode dengan sintaks tertentu dijalankan. Misalnya, klien MaxCompute yang menggunakan Python 3.8 dapat mengembalikan kesalahan saat kode dengan sintaks blok finally dijalankan. Disarankan untuk menggunakan Python 3.7.

  • Node PyODPS 3 dapat berjalan pada grup sumber daya tanpa server. Untuk informasi lebih lanjut tentang cara membeli dan menggunakan grup sumber daya tanpa server, lihat Buat dan Gunakan Grup Sumber Daya Tanpa Server.

  • Anda tidak dapat menjalankan beberapa tugas PyODPS 3 pada node PyODPS 3 secara bersamaan.

Contoh pengeditan kode sederhana

Setelah membuat node PyODPS, Anda dapat menulis dan menjalankan kode. Untuk informasi tentang sintaks PyODPS, lihat Ikhtisar.

  • Gunakan titik masuk MaxCompute

    Di DataWorks, setiap node PyODPS mencakup variabel global odps atau o, yang merupakan titik masuk MaxCompute. Oleh karena itu, Anda tidak perlu secara manual menentukan titik masuk MaxCompute.

    print(odps.exist_table('PyODPS_iris'))
  • Eksekusi pernyataan SQL

    Anda dapat mengeksekusi pernyataan SQL dalam node PyODPS. Untuk informasi lebih lanjut, lihat SQL.

    • Secara default, InstanceTunnel dinonaktifkan di DataWorks. Dalam hal ini, instance.open_reader dijalankan menggunakan antarmuka Hasil, dan maksimal 10.000 catatan data dapat dibaca. Anda dapat menggunakan reader.count untuk mendapatkan jumlah catatan data yang dibaca. Untuk membaca semua data secara iteratif, eksekusi pernyataan berikut untuk mengaktifkan InstanceTunnel secara global dan menghapus batasan jumlah catatan data yang akan dibaca:

      options.tunnel.use_instance_tunnel = True
      options.tunnel.limit_instance_tunnel = False  # Hapus batasan pada jumlah catatan data yang akan dibaca.
      
      with instance.open_reader() as reader:
        # Gunakan InstanceTunnel untuk membaca semua data.

    • Anda juga dapat menambahkan tunnel=True ke open_reader untuk mengaktifkan InstanceTunnel untuk operasi open_reader saat ini. Anda dapat menambahkan limit=False ke open_reader untuk menghapus batasan pada jumlah catatan data yang akan dibaca untuk operasi open_reader saat ini.

      # Operasi open_reader saat ini dilakukan menggunakan InstanceTunnel, dan semua data dapat dibaca.
      with instance.open_reader(tunnel=True, limit=False) as reader:
  • Konfigurasikan parameter waktu proses

    • Gunakan parameter hints untuk mengonfigurasi parameter waktu proses. Parameter hints bertipe DICT. Untuk informasi lebih lanjut tentang parameter hints, lihat Operasi SET.

      o.execute_sql('select * from PyODPS_iris', hints={'odps.sql.mapper.split.size': 16})
    • Jika Anda mengonfigurasi parameter sql.settings untuk konfigurasi global, konfigurasikan parameter waktu proses setiap kali Anda menjalankan kode node.

      from odps import options
      options.sql.settings = {'odps.sql.mapper.split.size': 16}
      o.execute_sql('select * from PyODPS_iris')  # Konfigurasikan parameter hints berdasarkan konfigurasi global.

  • Dapatkan hasil kueri pernyataan SQL

    Gunakan metode open_reader untuk mendapatkan hasil kueri dalam skenario berikut:

    • Pernyataan SQL mengembalikan data terstruktur.

      with o.execute_sql('select * from dual').open_reader() as reader:
      	for record in reader:  # Proses setiap catatan.

    • Pernyataan SQL seperti DESC dijalankan. Dalam hal ini, gunakan properti reader.raw untuk mendapatkan hasil kueri mentah.

      with o.execute_sql('desc dual').open_reader() as reader:
      	print(reader.raw)
      Catatan

      Jika Anda menggunakan parameter penjadwalan kustom dan menjalankan node PyODPS 3 pada tab konfigurasi node, tetapkan parameter penjadwalan ke nilai konstan untuk menentukan waktu tetap. Nilai parameter penjadwalan kustom untuk node PyODPS tidak dapat diganti secara otomatis.

  • Gunakan DataFrame untuk memproses data

    Anda dapat menggunakan DataFrame (tidak direkomendasikan) untuk memproses data.

    • Panggil Operasi API DataFrame

      Operasi API DataFrame tidak dipanggil secara otomatis. Operasi ini hanya dapat dipanggil saat Anda secara eksplisit memanggil metode yang dieksekusi segera.

      from odps.df import DataFrame
      iris = DataFrame(o.get_table('pyodps_iris'))
      for record in iris[iris.sepal_width < 3].execute():  # Panggil metode yang dieksekusi segera untuk memproses setiap catatan data.

      Untuk memanggil metode yang dieksekusi segera untuk tampilan data, atur options.interactive ke True.

      from odps import options
      from odps.df import DataFrame
      options.interactive = True  # Atur options.interactive ke True di awal kode.
      iris = DataFrame(o.get_table('pyodps_iris'))
      print(iris.sepal_width.sum())  # Metode ini segera dieksekusi setelah sistem menampilkan informasi.

    • Tampilkan Detail

      Untuk menampilkan detail, Anda harus mengatur options.verbose ke True. Secara default, parameter ini diatur ke True di DataWorks. Sistem menampilkan detail seperti URL Logview selama proses berjalan.

Contoh

Contoh berikut menjelaskan cara menggunakan node PyODPS:

  1. Siapkan dataset dan buat tabel bernama pyodps_iris. Untuk informasi lebih lanjut, lihat Pemrosesan Data DataFrame.

  2. Buat objek DataFrame. Untuk informasi lebih lanjut, lihat Buat Objek DataFrame dari Tabel MaxCompute.

  3. Masukkan kode berikut di editor kode node PyODPS dan jalankan kode:

    from odps.df import DataFrame
    
    # Buat objek DataFrame menggunakan tabel MaxCompute.
    iris = DataFrame(o.get_table('pyodps_iris'))
    print(iris.sepallength.head(5))

    Informasi berikut dikembalikan:

       sepallength
    0          4.5
    1          5.5
    2          4.9
    3          5.0
    4          6.0

Contoh pengeditan kode lanjutan

Jika Anda ingin node PyODPS dijadwalkan secara berkala, definisikan properti penjadwalan untuk node tersebut. Untuk informasi lebih lanjut, lihat Ikhtisar.

Konfigurasikan parameter penjadwalan

Di panel navigasi kanan tab konfigurasi node, klik tab Properties. Di bagian Scheduling Parameter tab Properti, konfigurasikan parameter penjadwalan untuk node tersebut. Metode mengonfigurasi parameter penjadwalan untuk node PyODPS berbeda dari metode untuk node SQL. Untuk informasi lebih lanjut, lihat Konfigurasikan Parameter Penjadwalan untuk Berbagai Jenis Node.

Berbeda dengan node SQL di DataWorks, string seperti ${param_name} tidak diganti dalam kode node PyODPS. Sebagai gantinya, sebuah kamus bernama args ditambahkan ke node PyODPS sebagai variabel global sebelum kode node dijalankan. Anda dapat memperoleh parameter penjadwalan dari kamus tersebut. Dengan cara ini, kode Python tidak terpengaruh. Misalnya, jika Anda mengatur ds=${yyyymmdd} di bagian Scheduling Parameter tab Properti, Anda dapat menjalankan perintah berikut untuk mendapatkan nilai parameter:

print('ds=' + args['ds'])
ds=20161116
Catatan

Anda dapat menjalankan perintah berikut untuk mendapatkan partisi bernama ds:

o.get_table('table_name').get_partition('ds=' + args['ds'])

Untuk informasi lebih lanjut tentang cara mengembangkan tugas PyODPS dalam skenario lain, lihat topik berikut:

Apa yang harus dilakukan selanjutnya

  • Tentukan Apakah Skrip Shell Kustom Berhasil Dijalankan: Logika untuk menentukan apakah skrip Python kustom berhasil dijalankan sama dengan logika untuk menentukan apakah skrip Shell kustom berhasil dijalankan. Anda dapat menggunakan metode ini untuk menentukan apakah skrip Python kustom berhasil dijalankan.

  • Sebarkan Node PyODPS 3: Jika Anda menggunakan ruang kerja dalam mode standar, Anda harus menyebarkan node PyODPS 3 ke lingkungan produksi sebelum node tersebut dapat dijadwalkan secara berkala.

  • Lakukan O&M pada Node PyODPS 3: Setelah node PyODPS 3 disebarkan ke Pusat Operasi di lingkungan produksi, Anda dapat melakukan operasi O&M pada node tersebut di Pusat Operasi.

  • Pelajari FAQ tentang PyODPS: Anda dapat mempelajari FAQ tentang PyODPS. Dengan cara ini, Anda dapat mengidentifikasi dan menyelesaikan masalah secara efisien saat terjadi pengecualian.

FAQ

T: Saat saya menjalankan node PyODPS 3 untuk mengumpulkan data aplikasi pihak ketiga seperti data Lark menggunakan kode dan mengimpor data ke DataWorks, node tersebut dapat berhasil dijalankan pada mesin lokal dan data dapat dikumpulkan. Namun, kesalahan batas waktu respons dilaporkan saat node PyODPS 3 dijalankan di Pusat Operasi setelah dikomit ke lingkungan produksi. Mengapa ini terjadi?

J: Pergi ke bagian Pengaturan Keamanan halaman Ruang Kerja di Pusat Manajemen konsol DataWorks, konfigurasikan daftar putih sandbox, lalu tambahkan informasi alamat aplikasi pihak ketiga untuk mengizinkan node PyODPS 3 mengakses aplikasi pihak ketiga. Gambar berikut menunjukkan contoh konfigurasi.

image